Re: DataFrame Column Alias problem

2015-05-22 Thread SLiZn Liu
However this returns a single column of c, without showing the original col1
.
​

On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com
wrote:

 df.groupBy($col1).agg(count($col1).as(c)).show

 On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote:

 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

  df.groupBy(col1).agg(col1 - count).show // I don't know if I should 
  write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with
 .sort(COUNT(col1#347)), but the column name of the count result
 obviously cannot be retrieved in advance. Intuitively one might consider
 acquire column name by column index in a fashion of R’s DataFrame, except
 Spark doesn’t support. I have Googled *spark agg alias* and so forth,
 and checked DataFrame.as in Spark API, neither helped on this. Am I the
 only one who had ever got stuck on this issue or anything I have missed?

 REGARDS,
 Todd Leo
 ​





Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
Hi All,

I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any
luck solving the issue?

Exception:

Exception in thread main org.apache.spark.sql.AnalysisException:
Unsupported language features in query: select * from
everest_marts_test.hive_ql_test where daily_partition=20150101
TOK_QUERY 1, 0,18, 14
  TOK_FROM 1, 4,8, 14
TOK_TABREF 1, 6,8, 14
  TOK_TABNAME 1, 6,8, 14
everest_marts_test 1, 6,6, 14
hive_ql_test 1, 8,8, 33
  TOK_INSERT 0, -1,18, 0
TOK_DESTINATION 0, -1,-1, 0
  TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 0, 0,2, 0
  TOK_SELEXPR 0, 2,2, 0
TOK_ALLCOLREF 0, 2,2, 0
TOK_WHERE 1, 10,18, 68
  TOK_FUNCTION 1, 12,18, 68
in 1, 14,14, 68
TOK_TABLE_OR_COL 1, 12,12, 52
  daily_partition 1, 12,12, 52
20150101 1, 16,18, 72

scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
20150101 :
20150101 1, 16,18, 72
 +

org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
  ;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:234)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at

Re: LDA prediction on new document

2015-05-22 Thread Ken Geis
Dani, this appears to be addressed in SPARK-5567, scheduled for Spark 1.5.0.


Ken

On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote:

 From: Dani Qiu zongmin@gmail.com
 Subject: LDA prediction on new document
 Date: May 21, 2015 at 8:48:40 PM PDT
 To: user@spark.apache.org
 
 
 Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM based 
 LDA implementation. It returns both topics and topic distribution. 
 
 My question is how can I use these parameters to predict on new document ? 
 
 And I notice there is an Online LDA implementation in spark master branch, it 
 only returns topics , how can I use this to  do prediction on new document 
 (and trained document) ?
 
 
 thanks


Re: DataFrame Column Alias problem

2015-05-22 Thread SLiZn Liu
Despite the odd usage, it does the trick, thanks Reynold!

On Fri, May 22, 2015 at 2:47 PM Reynold Xin r...@databricks.com wrote:

 In 1.4 it actually shows col1 by default.

 In 1.3, you can add col1 to the output, i.e.

 df.groupBy($col1).agg($col1, count($col1).as(c)).show()


 On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com
 wrote:

 However this returns a single column of c, without showing the original
 col1.
 ​

 On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com
 wrote:

 df.groupBy($col1).agg(count($col1).as(c)).show

 On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com
 wrote:

 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

  df.groupBy(col1).agg(col1 - count).show // I don't know if I 
  should write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with
 .sort(COUNT(col1#347)), but the column name of the count result
 obviously cannot be retrieved in advance. Intuitively one might consider
 acquire column name by column index in a fashion of R’s DataFrame, except
 Spark doesn’t support. I have Googled *spark agg alias* and so forth,
 and checked DataFrame.as in Spark API, neither helped on this. Am I
 the only one who had ever got stuck on this issue or anything I have 
 missed?

 REGARDS,
 Todd Leo
 ​






Re: Spark Memory management

2015-05-22 Thread Akhil Das
You can look at the logic for offloading data from Memory by looking at
ensureFreeSpace
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L416
call.
And dropFromMemory
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1011
is the call responsible for putting the data into disk (if applicable).

Thanks
Best Regards

On Fri, May 22, 2015 at 1:01 PM, swaranga sarma.swara...@gmail.com wrote:

 Experts,

 This is an academic question. Since Spark runs on the JVM, how it is able
 to
 do things like offloading RDDs from memory to disk when the data cannot fit
 into memory. How are the calculations performed? Does it use the methods
 availabe in the java.lang.Runtime class to get free/available memory? How
 accurate are these calculations?

 Thanks for any inputs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark Memory management

2015-05-22 Thread swaranga
Experts,

This is an academic question. Since Spark runs on the JVM, how it is able to
do things like offloading RDDs from memory to disk when the data cannot fit
into memory. How are the calculations performed? Does it use the methods
availabe in the java.lang.Runtime class to get free/available memory? How
accurate are these calculations?

Thanks for any inputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Official Docker container for Spark

2015-05-22 Thread ??????
spark src  have  dockerfile   ,you can use this




-- Original --
From:  tridib;tridib.sama...@live.com;
Date:  Fri, May 22, 2015 03:25 AM
To:  useruser@spark.apache.org; 

Subject:  Official Docker container for Spark



Hi,

I am using spark 1.2.0. Can you suggest docker containers which can be
deployed in production? I found lot of spark images in
https://registry.hub.docker.com/ . But could not figure out which one to
use. None of them seems like official image.

Does anybody have any recommendation?

Thanks
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Official-Docker-container-for-Spark-tp22977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: [pyspark] Starting workers in a virtualenv

2015-05-22 Thread Karlson

That works, thank you!

On 2015-05-22 03:15, Davies Liu wrote:

Could you try with specify PYSPARK_PYTHON to the path of python in
your virtual env, for example

PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py

On Mon, Apr 20, 2015 at 12:51 AM, Karlson ksonsp...@siberie.de wrote:

Hi all,

I am running the Python process that communicates with Spark in a
virtualenv. Is there any way I can make sure that the Python processes 
of

the workers are also started in a virtualenv? Currently I am getting
ImportErrors when the worker tries to unpickle stuff that is not 
installed
system-wide. For now both the worker and the driver run on the same 
machine

in local mode.

Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming and Drools

2015-05-22 Thread Antonio Giambanco
Hi All,
I'm deploying and architecture that uses flume for sending log information
in a sink.
Spark streaming read from this sink (pull strategy) e process al this
information, during this process I would like to make some event
processing. . . for example:
Log appender writes information about all transactions in my trading
platforms,
if a platform user sells more than buy during a week I need to receive an
alert on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much


Re: Official Docker container for Spark

2015-05-22 Thread Ritesh Kumar Singh
Use this:
sequenceiq/docker

Here's a link to their github repo:
docker-spark https://github.com/sequenceiq/docker-spark


They have repos for other big data tools too which are agin really nice.
Its being maintained properly by their devs and


Re: rdd.sample() methods very slow

2015-05-22 Thread Reynold Xin
You can do something like this:

val myRdd = ...

val rddSampledByPartition = PartitionPruningRDD.create(myRdd, i =
Random.nextDouble()  0.1)  // this samples 10% of the partitions

rddSampledByPartition.mapPartitions { iter = iter.take(10) }  // take the
first 10 elements out of each partition



On Thu, May 21, 2015 at 11:36 AM, Sean Owen so...@cloudera.com wrote:

 If sampling whole partitions is sufficient (or a part of a partition),
 sure you could mapPartitionsWithIndex and decide whether to process a
 partition at all based on its # and skip the rest. That's much faster.

 On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV)
 ningjun.w...@lexisnexis.com wrote:
  I don't need to be 100% randome. How about randomly pick a few
 partitions and return all docs in those partitions? Is
  rdd.mapPartitionsWithIndex() the right method to use to just process a
 small portion of partitions?
 
  Ningjun

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: LDA prediction on new document

2015-05-22 Thread Dani Qiu
thanks, Ken
but I am planning to use spark LDA in production. I cannot wait for the
future release.
 At least,  provide some workaround solution.

PS : in  SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567 ,
mentioned This will require inference but should be able to use the same
code, with a few modification to keep the inferred topics fixed. Can
somebody elaborate it more ?  folding-in in EM ?  or  Can I  simply
summing the topic distribution of the terms in the new document ?

On Fri, May 22, 2015 at 2:23 PM, Ken Geis geis@gmail.com wrote:

 Dani, this appears to be addressed in SPARK-5567
 https://issues.apache.org/jira/browse/SPARK-5567, scheduled for Spark
 1.5.0.


 Ken

 On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote:

 *From: *Dani Qiu zongmin@gmail.com
 *Subject: **LDA prediction on new document*
 *Date: *May 21, 2015 at 8:48:40 PM PDT
 *To: *user@spark.apache.org


 Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM
 based LDA implementation. It returns both topics and topic distribution.

 My question is how can I use these parameters to predict on new document ?

 And I notice there is an Online LDA implementation in spark master branch,
 it only returns topics , how can I use this to  do prediction on new
 document (and trained document) ?


 thanks




Re: 回复: How to use spark to access HBase with Security enabled

2015-05-22 Thread Ted Yu
Can you post the morning modified code ?

Thanks



 On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote:
 
 Hi,
 
 Thanks very much for the reply.  I have tried the SecurityUtil. I can see 
 from log that this statement executed successfully, but I still can not pass 
 the authentication of HBase. And with more experiments, I found a new 
 interesting senario. If I run the program with yarn-client mode, the driver 
 can pass the authentication, but the executors can not. If I run the program 
 with yarn-cluster mode, both the driver and the executors can not pass the 
 authentication.  Can anybody give me some clue with this info? Many Thanks!
 
 
 -- 原始邮件 --
 发件人: yuzhihong;yuzhih...@gmail.com;
 发送时间: 2015年5月22日(星期五) 凌晨5:29
 收件人: donhoff_h165612...@qq.com;
 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org;
 主题: Re: How to use spark to access HBase with Security enabled
 
 Are the worker nodes colocated with HBase region servers ?
 
 Were you running as hbase super user ?
 
 You may need to login, using code similar to the following:
   if (isSecurityEnabled()) {
 
 SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
   }
 
 SecurityUtil is hadoop class.
 
 
 
 Cheers
 
 
 On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:
 Hi,
 
 Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
 Yarn. According to your advice I have changed the configuration. Now my 
 program can read the hbase-site.xml correctly. And it can also authenticate 
 with zookeeper successfully. 
 
 But I meet a new problem that is my program still can not pass the 
 authentication of HBase. Did you or anybody else ever meet such kind of 
 situation ?  I used a keytab file to provide the principal. Since it can 
 pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
 But it jsut can not pass the authentication of HBase. The exception is 
 listed below and could you or anybody else help me ? Still many many thanks!
 
 Exception***
 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
 connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
 quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
 baseZNode=/hbase
 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
 SASL mechanism.
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
 server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate 
 using Login Context section 'Client'
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established 
 to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu 
 May 21 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri 
 May 22 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 
 22 11:43:32 CST 2015
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete 
 on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
 negotiated timeout = 4
 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable 
 called multiple times. Overwriting connection and table reference; 
 TableInputFormatBase will not close these old references when done.
 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes 
 for table ns_dev1:hd01.
 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while 
 connecting to the server : javax.security.sasl.SaslException: GSS initiate 
 failed [Caused by GSSException: No valid credentials provided (Mechanism 
 level: Failed to find any Kerberos tgt)]
 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. 
 The most likely cause is missing or invalid credentials. Consider 'kinit'.
 javax.security.sasl.SaslException: GSS initiate failed [Caused by 
 GSSException: No valid credentials provided (Mechanism level: Failed to find 
 any Kerberos tgt)]
 at 
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
 at 
 org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
 at 
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
 at 
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
 at 
 org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730)
 at 
 

?????? ?????? How to use spark to access HBase with Security enabled

2015-05-22 Thread donhoff_h
Hi,

My modified code is listed below, just add the SecurityUtil API.  I don't know 
which propertyKeys I should use, so I make 2 my own propertyKeys to find the 
keytab and principal.

object TestHBaseRead2 {
 def main(args: Array[String]) {

   val conf = new SparkConf()
   val sc = new SparkContext(conf)
   val hbConf = HBaseConfiguration.create()
   hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab)
   hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb)
   SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal)
   val conn = ConnectionFactory.createConnection(hbConf)
   val tbl = conn.getTable(TableName.valueOf(spark_t01))
   try {
 val get = new Get(Bytes.toBytes(row01))
 val res = tbl.get(get)
 println(result:+res.toString)
   }
   finally {
 tbl.close()
 conn.close()
 es.shutdown()
   }

   val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
   val v = rdd.sum()
   println(Value=+v)
   sc.stop()

 }
}




--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??22??(??) 3:25
??: donhoff_h165612...@qq.com; 
: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 
: Re: ?? How to use spark to access HBase with Security enabled



Can you post the morning modified code ?


Thanks




On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote:


Hi,

Thanks very much for the reply.  I have tried the SecurityUtil. I can see 
from log that this statement executed successfully, but I still can not pass 
the authentication of HBase. And with more experiments, I found a new 
interesting senario. If I run the program with yarn-client mode, the driver can 
pass the authentication, but the executors can not. If I run the program with 
yarn-cluster mode, both the driver and the executors can not pass the 
authentication.  Can anybody give me some clue with this info? Many Thanks!




--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??22??(??) 5:29
??: donhoff_h165612...@qq.com; 
: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 
: Re: How to use spark to access HBase with Security enabled



Are the worker nodes colocated with HBase region servers ?

Were you running as hbase super user ?


You may need to login, using code similar to the following:
 
  if (isSecurityEnabled()) {
 
SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
  }

 

SecurityUtil is hadoop class.




Cheers



On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:
Hi,

Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. 
According to your advice I have changed the configuration. Now my program can 
read the hbase-site.xml correctly. And it can also authenticate with zookeeper 
successfully. 

But I meet a new problem that is my program still can not pass the 
authentication of HBase. Did you or anybody else ever meet such kind of 
situation ?   I used a keytab file to provide the principal. Since it can pass 
the authentication of the Zookeeper, I am sure the keytab file is OK. But it 
jsut can not pass the authentication of HBase. The exception is listed below 
and could you or anybody else help me ? Still many many thanks!

Exception***
15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
baseZNode=/hbase
15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
SASL mechanism.
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using 
Login Context section 'Client'
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to 
bgdt02.dev.hrb/130.1.9.98:2181, initiating session
15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 
21 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri May 
22 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 
11:43:32 CST 2015
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on 
server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
negotiated timeout = 4
15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called 
multiple times. Overwriting connection and table reference; 
TableInputFormatBase will not close these old references when done.
15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for 

Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Gautam Bajaj
This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of
AsyncRDDActions in Java ?

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote:

 I am received data at UDP port 8060 and doing processing on it using Spark
 and storing the output in Neo4j.

 But the data I'm receiving and the data that is getting stored doesn't
 match probably because Neo4j API takes too long to push the data into
 database. Meanwhile, Spark is unable to receive data probably because the
 process is blocked.

 On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Can you elaborate on how the data loss is occurring?


 On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 That is completely alright, as the system will make sure the works get
 done.

 My major concern is, the data drop. Will using async stop data loss?

 On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 If you cannot push data as fast as you are generating it, then async
 isnt going to help either. The work is just going to keep piling up as
 many many async jobs even though your batch processing times will be low as
 that processing time is not going to reflect how much of overall work is
 pending in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry
 point, for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= 
 tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method stub
 GraphDatabaseService graphDb = 
 new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, 
 ArrayList  String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true;
 }
 else
 
 HMac=Neo4jOperations.createHMac(graphDb, tuple._1);

 ArrayListString 
 zipcodes=tuple._2;
 for(String zipcode : 
 zipcodes){
 Node 
 Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
 if(Zipcode!=null){
 
 System.out.println(Already in Database: + zipcode);
 if(oldHMac==true 
  Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
 
 Neo4jOperations.updateToCurrentTime(HMac, Zipcode);
 else
 
 Neo4jOperations.travelTo(HMac, Zipcode);
 

?????? How to use spark to access HBase with Security enabled

2015-05-22 Thread donhoff_h
Hi,

Thanks very much for the reply.  I have tried the SecurityUtil. I can see 
from log that this statement executed successfully, but I still can not pass 
the authentication of HBase. And with more experiments, I found a new 
interesting senario. If I run the program with yarn-client mode, the driver can 
pass the authentication, but the executors can not. If I run the program with 
yarn-cluster mode, both the driver and the executors can not pass the 
authentication.  Can anybody give me some clue with this info? Many Thanks!




--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??22??(??) 5:29
??: donhoff_h165612...@qq.com; 
: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 
: Re: How to use spark to access HBase with Security enabled



Are the worker nodes colocated with HBase region servers ?

Were you running as hbase super user ?


You may need to login, using code similar to the following:
 
  if (isSecurityEnabled()) {
 
SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
  }

 

SecurityUtil is hadoop class.




Cheers



On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:
Hi,

Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. 
According to your advice I have changed the configuration. Now my program can 
read the hbase-site.xml correctly. And it can also authenticate with zookeeper 
successfully. 

But I meet a new problem that is my program still can not pass the 
authentication of HBase. Did you or anybody else ever meet such kind of 
situation ?   I used a keytab file to provide the principal. Since it can pass 
the authentication of the Zookeeper, I am sure the keytab file is OK. But it 
jsut can not pass the authentication of HBase. The exception is listed below 
and could you or anybody else help me ? Still many many thanks!

Exception***
15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
baseZNode=/hbase
15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
SASL mechanism.
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using 
Login Context section 'Client'
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to 
bgdt02.dev.hrb/130.1.9.98:2181, initiating session
15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 
21 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri May 
22 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 
11:43:32 CST 2015
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on 
server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
negotiated timeout = 4
15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called 
multiple times. Overwriting connection and table reference; 
TableInputFormatBase will not close these old references when done.
15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for 
table ns_dev1:hd01.
15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while 
connecting to the server : javax.security.sasl.SaslException: GSS initiate 
failed [Caused by GSSException: No valid credentials provided (Mechanism level: 
Failed to find any Kerberos tgt)]
15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The 
most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
at 
org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730)
at 
org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
   

Re: DataFrame Column Alias problem

2015-05-22 Thread Reynold Xin
In 1.4 it actually shows col1 by default.

In 1.3, you can add col1 to the output, i.e.

df.groupBy($col1).agg($col1, count($col1).as(c)).show()


On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com wrote:

 However this returns a single column of c, without showing the original
 col1.
 ​

 On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com
 wrote:

 df.groupBy($col1).agg(count($col1).as(c)).show

 On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com
 wrote:

 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

  df.groupBy(col1).agg(col1 - count).show // I don't know if I 
  should write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with
 .sort(COUNT(col1#347)), but the column name of the count result
 obviously cannot be retrieved in advance. Intuitively one might consider
 acquire column name by column index in a fashion of R’s DataFrame, except
 Spark doesn’t support. I have Googled *spark agg alias* and so forth,
 and checked DataFrame.as in Spark API, neither helped on this. Am I the
 only one who had ever got stuck on this issue or anything I have missed?

 REGARDS,
 Todd Leo
 ​





Re: Spark Memory management

2015-05-22 Thread ??????
in spark src   this class org.apache.spark.deploy.worker.WorkerArguments

def inferDefaultCores(): Int = {
  Runtime.getRuntime.availableProcessors()
}

def inferDefaultMemory(): Int = {
  val ibmVendor = System.getProperty(java.vendor).contains(IBM)
  var totalMb = 0
  try {
val bean = ManagementFactory.getOperatingSystemMXBean()
if (ibmVendor) {
  val beanClass = 
Class.forName(com.ibm.lang.management.OperatingSystemMXBean)
  val method = beanClass.getDeclaredMethod(getTotalPhysicalMemory)
  totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
} else {
  val beanClass = Class.forName(com.sun.management.OperatingSystemMXBean)
  val method = beanClass.getDeclaredMethod(getTotalPhysicalMemorySize)
  totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
}
  } catch {
case e: Exception = {
  totalMb = 2*1024
  System.out.println(Failed to get total physical memory. Using  + 
totalMb +  MB)
}
  }
  // Leave out 1 GB for the operating system, but don't return a negative 
memory size
  math.max(totalMb - 1024, 512)
}




-- Original --
From:  swaranga;sarma.swara...@gmail.com;
Date:  Fri, May 22, 2015 03:31 PM
To:  useruser@spark.apache.org; 

Subject:  Spark Memory management



Experts,

This is an academic question. Since Spark runs on the JVM, how it is able to
do things like offloading RDDs from memory to disk when the data cannot fit
into memory. How are the calculations performed? Does it use the methods
availabe in the java.lang.Runtime class to get free/available memory? How
accurate are these calculations?

Thanks for any inputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread SparknewUser
I am new in MLlib and in Spark.(I use Scala)

I'm trying to understand how LogisticRegressionWithLBFGS and
LogisticRegressionWithSGD work.
I usually use R to do logistic regressions but now I do it on Spark
to be able to analyze Big Data.

The model only returns weights and intercept. My problem is that I have no
information about which variable is significant and which variable I had
better
to delete to improve my model. I only have the confusion matrix and the AUC
to evaluate the performance.

Is there any way to have information about the variables I put in my model?
How can I try different variable combinations, do I have to modify the
dataset
of origin (e.g. delete one or several columns?)
How are the weights calculated: is there a correlation calculation with the
variable
of interest?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
What is also strange is that this seems to work on external JSON data, but not 
Parquet.  I’ll try to do more verification of that next week.


 On May 22, 2015, at 16:24, yana yana.kadiy...@gmail.com wrote:
 
 There is an open Jira on Spark not pushing predicates to metastore. I have a 
 large dataset with many partitions but doing anything with it 8s very 
 slow...But I am surprised Spark 1.2 worked for you: it has this problem...
 
  Original message 
 From: Andrew Otto
 Date:05/22/2015 3:51 PM (GMT-05:00)
 To: user@spark.apache.org
 Cc: Joseph Allemandou ,Madhumitha Viswanathan
 Subject: HiveContext fails when querying large external Parquet tables
 
 Hi all,
 
 (This email was easier to write in markdown, so I’ve created a gist with its 
 contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
 https://gist.github.com/ottomata/f91ea76cece97444e269.  I’ll paste the 
 markdown content in the email body here too.)
 
 ---
 We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
 1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. 
  Since upgrading, we can no longer query our large webrequest dataset using 
 HiveContext.  HiveContext + Parquet and other file types work fine with 
 external tables (We have a similarly large JSON external table that works 
 just fine with HiveContext.)
 
 Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
 mainly interact with this dataset via a Hive external table, but also have 
 been using Spark's HiveContext.
 
 ```
 # This single hourly directory is only 5.3M
 $ hdfs dfs -du -s -h 
 /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
 5.3 M  15.8 M  
 /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
 
 # This monthly directory is 1.8T.  (There are subdirectories down to hourly 
 level here too.)
 $ hdfs dfs -du -s -h 
 /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
 1.8 T  5.3 T  
 /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
 ```
 
 If I create a Hive table on top of this data, and add the single hourly 
 partition, querying works via both Hive and Spark HiveContext
 
 ```sql
 hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS 
 `otto.webrequest_few_partitions_big_data`(
 `hostname`  string  COMMENT 'Source node hostname',
 `sequence`  bigint  COMMENT 'Per host sequence number',
 `dt`string  COMMENT 'Timestame at cache in ISO 8601',
 `time_firstbyte`double  COMMENT 'Time to first byte',
 `ip`string  COMMENT 'IP of packet at cache',
 `cache_status`  string  COMMENT 'Cache status',
 `http_status`   string  COMMENT 'HTTP status of response',
 `response_size` bigint  COMMENT 'Response size',
 `http_method`   string  COMMENT 'HTTP method of request',
 `uri_host`  string  COMMENT 'Host of request',
 `uri_path`  string  COMMENT 'Path of request',
 `uri_query` string  COMMENT 'Query of request',
 `content_type`  string  COMMENT 'Content-Type header of response',
 `referer`   string  COMMENT 'Referer header of request',
 `x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
 `user_agent`string  COMMENT 'User-Agent header of request',
 `accept_language`   string  COMMENT 'Accept-Language header of request',
 `x_analytics`   string  COMMENT 'X-Analytics header of response',
 `range` string  COMMENT 'Range header of response',
 `is_pageview`   boolean COMMENT 'Indicates if this record was marked 
 as a pageview during refinement',
 `record_version`string  COMMENT 'Keeps track of changes in the table 
 content definition - 
 https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest' 
 https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
 `client_ip` string  COMMENT 'Client IP computed during refinement 
 using ip and x_forwarded_for',
 `geocoded_data` mapstring, string  COMMENT 'Geocoded map with 
 continent, country_code, country, city, subdivision, postal_code, latitude, 
 longitude, timezone keys  and associated values.',
 `x_cache`   string  COMMENT 'X-Cache header of response',
 `user_agent_map`mapstring, string  COMMENT 'User-agent map with 
 browser_name, browser_major, device, os_name, os_minor, os_major keys and 
 associated values',
 `x_analytics_map`   mapstring, string  COMMENT 'X_analytics map view of 
 the x_analytics field',
 `ts`timestampCOMMENT 'Unix timestamp in 
 milliseconds extracted from dt',
 `access_method` string  COMMENT 'Method used to accessing the site 
 (mobile app|mobile web|desktop)',
 `agent_type`string  COMMENT 'Categorise the agent making the 
 webrequest as either user or spider (automatas to be added).',

RE: HiveContext fails when querying large external Parquet tables

2015-05-22 Thread yana
There is an open Jira on Spark not pushing predicates to metastore. I have a 
large dataset with many partitions but doing anything with it 8s very 
slow...But I am surprised Spark 1.2 worked for you: it has this problem...

div Original message /divdivFrom: Andrew Otto 
ao...@wikimedia.org /divdivDate:05/22/2015  3:51 PM  (GMT-05:00) 
/divdivTo: user@spark.apache.org /divdivCc: Joseph Allemandou 
jalleman...@wikimedia.org,Madhumitha Viswanathan mviswanat...@wikimedia.org 
/divdivSubject: HiveContext fails when querying large external Parquet 
tables /divdiv
/divHi all,

(This email was easier to write in markdown, so I’ve created a gist with its 
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269.  I’ll 
paste the markdown content in the email body here too.)

---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.  
Since upgrading, we can no longer query our large webrequest dataset using 
HiveContext.  HiveContext + Parquet and other file types work fine with 
external tables (We have a similarly large JSON external table that works just 
fine with HiveContext.)

Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
mainly interact with this dataset via a Hive external table, but also have been 
using Spark's HiveContext.

```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M  15.8 M  
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0

# This monthly directory is 1.8T.  (There are subdirectories down to hourly 
level here too.)
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T  5.3 T  /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```

If I create a Hive table on top of this data, and add the single hourly 
partition, querying works via both Hive and Spark HiveContext

```sql
hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS 
`otto.webrequest_few_partitions_big_data`(
`hostname`  string  COMMENT 'Source node hostname',
`sequence`  bigint  COMMENT 'Per host sequence number',
`dt`string  COMMENT 'Timestame at cache in ISO 8601',
`time_firstbyte`double  COMMENT 'Time to first byte',
`ip`string  COMMENT 'IP of packet at cache',
`cache_status`  string  COMMENT 'Cache status',
`http_status`   string  COMMENT 'HTTP status of response',
`response_size` bigint  COMMENT 'Response size',
`http_method`   string  COMMENT 'HTTP method of request',
`uri_host`  string  COMMENT 'Host of request',
`uri_path`  string  COMMENT 'Path of request',
`uri_query` string  COMMENT 'Query of request',
`content_type`  string  COMMENT 'Content-Type header of response',
`referer`   string  COMMENT 'Referer header of request',
`x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
`user_agent`string  COMMENT 'User-Agent header of request',
`accept_language`   string  COMMENT 'Accept-Language header of request',
`x_analytics`   string  COMMENT 'X-Analytics header of response',
`range` string  COMMENT 'Range header of response',
`is_pageview`   boolean COMMENT 'Indicates if this record was marked as 
a pageview during refinement',
`record_version`string  COMMENT 'Keeps track of changes in the table 
content definition - 
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
`client_ip` string  COMMENT 'Client IP computed during refinement 
using ip and x_forwarded_for',
`geocoded_data` mapstring, string  COMMENT 'Geocoded map with 
continent, country_code, country, city, subdivision, postal_code, latitude, 
longitude, timezone keys  and associated values.',
`x_cache`   string  COMMENT 'X-Cache header of response',
`user_agent_map`mapstring, string  COMMENT 'User-agent map with 
browser_name, browser_major, device, os_name, os_minor, os_major keys and 
associated values',
`x_analytics_map`   mapstring, string  COMMENT 'X_analytics map view of 
the x_analytics field',
`ts`timestampCOMMENT 'Unix timestamp in 
milliseconds extracted from dt',
`access_method` string  COMMENT 'Method used to accessing the site 
(mobile app|mobile web|desktop)',
`agent_type`string  COMMENT 'Categorise the agent making the 
webrequest as either user or spider (automatas to be added).',
`is_zero`   boolean COMMENT 'Indicates if the webrequest is 
accessed through a zero provider',
`referer_class` string  COMMENT 'Indicates if a referer is internal, 
external or unknown.'
)
PARTITIONED BY (
`webrequest_source` string  COMMENT 'Source cluster',
`year` 

Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Tathagata Das
Can you show us the rest of the program? When are you starting, or stopping
the context. Is the exception occuring right after start or stop? What
about log4j logs, what does it say?

On Fri, May 22, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote:

 I just verified that the following code works on 1.3.0 :

 val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topic1)

 val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

 stream1.print()

 stream2.print()


 So something else is probably going on in your case.  See if simply
 printing the two streams works for you, then compare whats different in
 your actual job.

 On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 Hi,

 I'm trying to connect to two topics of Kafka with Spark with DirectStream
 but I get an error. I don't know if there're any limitation to do it,
 because when I just access to one topics everything if right.

 *val ssc = new StreamingContext(sparkConf, Seconds(5))*
 *val kafkaParams = Map[String, String](metadata.broker.list -
 quickstart.cloudera:9092)*
 *val setTopic1 = Set(topic1)*
 *val setTopic2 = Set(topic2)*

 *val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
 *val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


 The error that I get is:
 * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
 *15/05/22 13:12:40 ERROR OneForOneStrategy: *
 *java.lang.NullPointerException*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
 * at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
 * at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
 * at
 scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


 Are there any limitation to do it?





HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
Hi all,

(This email was easier to write in markdown, so I’ve created a gist with its 
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
https://gist.github.com/ottomata/f91ea76cece97444e269.  I’ll paste the 
markdown content in the email body here too.)

---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.  
Since upgrading, we can no longer query our large webrequest dataset using 
HiveContext.  HiveContext + Parquet and other file types work fine with 
external tables (We have a similarly large JSON external table that works just 
fine with HiveContext.)

Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
mainly interact with this dataset via a Hive external table, but also have been 
using Spark's HiveContext.

```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M  15.8 M  
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0

# This monthly directory is 1.8T.  (There are subdirectories down to hourly 
level here too.)
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T  5.3 T  /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```

If I create a Hive table on top of this data, and add the single hourly 
partition, querying works via both Hive and Spark HiveContext

```sql
hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS 
`otto.webrequest_few_partitions_big_data`(
`hostname`  string  COMMENT 'Source node hostname',
`sequence`  bigint  COMMENT 'Per host sequence number',
`dt`string  COMMENT 'Timestame at cache in ISO 8601',
`time_firstbyte`double  COMMENT 'Time to first byte',
`ip`string  COMMENT 'IP of packet at cache',
`cache_status`  string  COMMENT 'Cache status',
`http_status`   string  COMMENT 'HTTP status of response',
`response_size` bigint  COMMENT 'Response size',
`http_method`   string  COMMENT 'HTTP method of request',
`uri_host`  string  COMMENT 'Host of request',
`uri_path`  string  COMMENT 'Path of request',
`uri_query` string  COMMENT 'Query of request',
`content_type`  string  COMMENT 'Content-Type header of response',
`referer`   string  COMMENT 'Referer header of request',
`x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
`user_agent`string  COMMENT 'User-Agent header of request',
`accept_language`   string  COMMENT 'Accept-Language header of request',
`x_analytics`   string  COMMENT 'X-Analytics header of response',
`range` string  COMMENT 'Range header of response',
`is_pageview`   boolean COMMENT 'Indicates if this record was marked as 
a pageview during refinement',
`record_version`string  COMMENT 'Keeps track of changes in the table 
content definition - 
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
`client_ip` string  COMMENT 'Client IP computed during refinement 
using ip and x_forwarded_for',
`geocoded_data` mapstring, string  COMMENT 'Geocoded map with 
continent, country_code, country, city, subdivision, postal_code, latitude, 
longitude, timezone keys  and associated values.',
`x_cache`   string  COMMENT 'X-Cache header of response',
`user_agent_map`mapstring, string  COMMENT 'User-agent map with 
browser_name, browser_major, device, os_name, os_minor, os_major keys and 
associated values',
`x_analytics_map`   mapstring, string  COMMENT 'X_analytics map view of 
the x_analytics field',
`ts`timestampCOMMENT 'Unix timestamp in 
milliseconds extracted from dt',
`access_method` string  COMMENT 'Method used to accessing the site 
(mobile app|mobile web|desktop)',
`agent_type`string  COMMENT 'Categorise the agent making the 
webrequest as either user or spider (automatas to be added).',
`is_zero`   boolean COMMENT 'Indicates if the webrequest is 
accessed through a zero provider',
`referer_class` string  COMMENT 'Indicates if a referer is internal, 
external or unknown.'
)
PARTITIONED BY (
`webrequest_source` string  COMMENT 'Source cluster',
`year`  int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day`   int COMMENT 'Unpadded day of request',
`hour`  int COMMENT 'Unpadded hour of request'
)
CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
STORED AS PARQUET
LOCATION '/wmf/data/wmf/webrequest'
;

hive (otto) alter table otto.webrequest_few_partitions_big_data add partition 
(webrequest_source='misc', year=2015, month=5, day=20, hour=0) location 

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help either. The work is just going to keep piling up as many many async 
jobs even though your batch processing times will be low as that processing 
time is not going to reflect how much of overall work is pending in the system.

 

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote:

Hi,

 

From my understanding of Spark Streaming, I created a spark entry point, for 
continuous UDP data, using:

 

SparkConf conf = new 

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
… and measure 4 is to implement a custom Feedback Loop to e.g.to  monitor the 
amount of free RAM and number of queued jobs and automatically decrease the 
message consumption  rate of the Receiver until the number of clogged RDDs and 
Jobs subsides (again here you artificially decrease your performance in the 
name of the reliability/integrity of your system ie not loosing messages)

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 9:39 PM
To: 'Tathagata Das'; 'Gautam Bajaj'
Cc: 'user'
Subject: RE: Storing spark processed output to Database asynchronously.

 

If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com 

spark.executor.extraClassPath - Values not picked up by executors

2015-05-22 Thread Todd Nist
I'm using the spark-cassandra-connector from DataStax in a spark streaming
job launched from my own driver.  It is connecting a a standalone cluster
on my local box which has two worker running.

This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT.  I have
added the following entry to my $SPARK_HOME/conf/spark-default.conf:

spark.executor.extraClassPath
/projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar


When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes up
just fine.  As do the two workers with the following command:

Worker 1, port 8081:

radtech:spark $ ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://radtech.io:7077
--webui-port 8081 --cores 2

Worker 2, port 8082

radtech:spark $ ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://radtech.io:7077
--webui-port 8082 --cores 2

When I execute the Driver connecting the the master:

sbt app/run -Dspark.master=spark://radtech.io:7077

It starts up, but when the executors are launched they do not include the
entry in the spark.executor.extraClassPath:

15/05/22 17:35:26 INFO Worker: Asked to launch executor
app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO
ExecutorRunner: Launch command: java -cp
/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar
-Dspark.driver.port=55932 -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
--driver-url 
akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler
--executor-id 0 --hostname 192.168.1.3 --cores 2
--app-id app-20150522173526- --worker-url
akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker



which will then cause the executor to fail with a ClassNotFoundException,
which I would expect:

[WARN] [2015-05-22 17:38:18,035]
[org.apache.spark.scheduler.TaskSetManager]: Lost task 0.0 in stage
2.0 (TID 23, 192.168.1.3): java.lang.ClassNotFoundException:
com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I also notice that some of the entires on the executor classpath are
duplicated?  This is a newly installed spark-1.3.1-bin-hadoop2.6
 standalone cluster just to ensure I had nothing from testing in the way.

I can set the SPARK_CLASSPATH in the $SPARK_HOME/spark-env.sh and it will
pick up the jar and append it fine.

Any suggestions on what is going on here?  Seems to just ignore whatever I
have in the spark.executor.extraClassPath.  Is there a different way to do
this?

TIA.

-Todd


Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-22 Thread DB Tsai
Great to see the result comparable to R in new ML implementation.
Since majority of users will still use the old mllib api, we plan to
call the ML implementation from MLlib to handle the intercept
correctly with regularization.

JIRA is created.
https://issues.apache.org/jira/browse/SPARK-7780

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Fri, May 22, 2015 at 10:45 AM, Xin Liu liuxin...@gmail.com wrote:
 Thank you guys for the prompt help.

 I ended up building spark master and verified what DB has suggested.

 val lr = (new MlLogisticRegression)
.setFitIntercept(true)
.setMaxIter(35)

  val model = lr.fit(sqlContext.createDataFrame(training))
  val scoreAndLabels = model.transform(sqlContext.createDataFrame(test))
.select(probability, label)
.map { case Row(probability: Vector, label: Double) =
  (probability(1), label)
}

 Without doing much tuning, above generates

 Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914]
 Intercept: -3.3076806966913006
 Area under ROC: 0.7033511043412033

 I also tried it on a much bigger dataset I have and its results are close to
 what I get from statsmodel.

 Now early waiting for the 1.4 release.

 Thanks,
 Xin



 On Wed, May 20, 2015 at 9:37 PM, Chris Gore cdg...@cdgore.com wrote:

 I tried running this data set as described with my own implementation of
 L2 regularized logistic regression using LBFGS to compare:
 https://github.com/cdgore/fitbox

 Intercept: -0.886745823033
 Weights (['gre', 'gpa', 'rank']):[ 0.28862268  0.19402388 -0.36637964]
 Area under ROC: 0.724056603774

 The difference could be from the feature preprocessing as mentioned.  I
 normalized the features around 0:

 binary_train_normalized = (binary_train - binary_train.mean()) /
 binary_train.std()
 binary_test_normalized = (binary_test - binary_train.mean()) /
 binary_train.std()

 On a data set this small, the difference in models could also be the
 result of how the training/test sets were split.

 Have you tried running k-folds cross validation on a larger data set?

 Chris

 On May 20, 2015, at 6:15 PM, DB Tsai d...@netflix.com.INVALID wrote:

 Hi Xin,

 If you take a look at the model you trained, the intercept from Spark
 is significantly smaller than StatsModel, and the intercept represents
 a prior on categories in LOR which causes the low accuracy in Spark
 implementation. In LogisticRegressionWithLBFGS, the intercept is
 regularized due to the implementation of Updater, and the intercept
 should not be regularized.

 In the new pipleline APIs, a LOR with elasticNet is implemented, and
 the intercept is properly handled.

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala

 As you can see the tests,

 https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 the result is exactly the same as R now.

 BTW, in both version, the feature scalings are done before training,
 and we train the model in scaled space but transform the model weights
 back to original space. The only difference is in the mllib version,
 LogisticRegressionWithLBFGS regularizes the intercept while in the ml
 version, the intercept is excluded from regularization. As a result,
 if lambda is zero, the model should be the same.



 On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote:

 Hi,

 I have tried a few models in Mllib to train a LogisticRegression model.
 However, I consistently get much better results using other libraries such
 as statsmodel (which gives similar results as R) in terms of AUC. For
 illustration purpose, I used a small data (I have tried much bigger data)
 http://www.ats.ucla.edu/stat/data/binary.csv in
 http://www.ats.ucla.edu/stat/r/dae/logit.htm

 Here is the snippet of my usage of LogisticRegressionWithLBFGS.

 val algorithm = new LogisticRegressionWithLBFGS
 algorithm.setIntercept(true)
 algorithm.optimizer
   .setNumIterations(100)
   .setRegParam(0.01)
   .setConvergenceTol(1e-5)
 val model = algorithm.run(training)
 model.clearThreshold()
 val scoreAndLabels = test.map { point =
   val score = model.predict(point.features)
   (score, point.label)
 }
 val metrics = new BinaryClassificationMetrics(scoreAndLabels)
 val auROC = metrics.areaUnderROC()

 I did a (0.6, 0.4) split for training/test. The response is admit and
 features are GRE score, GPA, and college Rank.

 Spark:
 Weights (GRE, GPA, Rank):
 [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
 Intercept: -0.6488972641282202
 Area under ROC: 0.6294070512820512

 StatsModel:
 Weights [0.0018, 0.7220, -0.3148]
 Intercept: -3.5913
 Area under ROC: 0.69

 The weights from statsmodel seems more reasonable if you consider for a
 one
 unit increase in gpa, the log odds of 

Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread DB Tsai
In Spark 1.4, Logistic Regression with elasticNet is implemented in ML
pipeline framework. Model selection can be achieved through high
lambda resulting lots of zero in the coefficients.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Fri, May 22, 2015 at 1:19 AM, SparknewUser
melanie.galloi...@gmail.com wrote:
 I am new in MLlib and in Spark.(I use Scala)

 I'm trying to understand how LogisticRegressionWithLBFGS and
 LogisticRegressionWithSGD work.
 I usually use R to do logistic regressions but now I do it on Spark
 to be able to analyze Big Data.

 The model only returns weights and intercept. My problem is that I have no
 information about which variable is significant and which variable I had
 better
 to delete to improve my model. I only have the confusion matrix and the AUC
 to evaluate the performance.

 Is there any way to have information about the variables I put in my model?
 How can I try different variable combinations, do I have to modify the
 dataset
 of origin (e.g. delete one or several columns?)
 How are the weights calculated: is there a correlation calculation with the
 variable
 of interest?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark on Windows 2008 failed to save RDD to windows shared folder

2015-05-22 Thread Wang, Ningjun (LNG-NPV)
I used spark standalone cluster on Windows 2008. I kept on getting the 
following error when trying to save an RDD to a windows shared folder

rdd.saveAsObjectFile(file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj)

15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 12)
java.io.IOException: Mkdirs failed to create 
file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12
at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at 
org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1071)
at 
org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270)
at 
org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527)
at 
org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)
at 
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
The T: drive is mapped to a windows shared folder, e.g.  T:  -  
\\10.196.119.230\myshare

The id running spark does have write permission to this folder. It works most 
of the time but failed sometime.

Can anybody tell me what is the problem here?

Please advise. Thanks.


Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
Hi All,

I have cluster of four nodes (three workers and one master, with one core
each) which consumes data from Kinesis at 15 second intervals using two
streams (i.e. receivers). The job simply grabs the latest batch and pushes
it to MongoDB. I believe that the problem is that all tasks are executed on
a single worker node and never distributed to the others. This is true even
after I set the number of concurrentJobs to 3. Overall, I would really like
to increase throughput (i.e. more than 500 records / second) and understand
why all executors are not being utilized.

Here are some parameters I have set:

   -
   - spark.streaming.blockInterval   200
   - spark.locality.wait 500
   - spark.streaming.concurrentJobs  3

This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
val result = doSomething(rdd, time)
result.foreachPartition { i =
i.foreach(record = connection.insert(record))
}
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike.


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
I guess each receiver occupies a executor. So there was only one executor
available for processing the job.

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I have cluster of four nodes (three workers and one master, with one core
 each) which consumes data from Kinesis at 15 second intervals using two
 streams (i.e. receivers). The job simply grabs the latest batch and pushes
 it to MongoDB. I believe that the problem is that all tasks are executed on
 a single worker node and never distributed to the others. This is true even
 after I set the number of concurrentJobs to 3. Overall, I would really like
 to increase throughput (i.e. more than 500 records / second) and understand
 why all executors are not being utilized.

 Here are some parameters I have set:

-
- spark.streaming.blockInterval   200
- spark.locality.wait 500
- spark.streaming.concurrentJobs  3

 This is the code that's actually doing the writing:

 def write(rdd: RDD[Data], time:Time) : Unit = {
 val result = doSomething(rdd, time)
 result.foreachPartition { i =
 i.foreach(record = connection.insert(record))
 }
 }

 def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
 rdd.flatMap(MyObject)
 }

 Any ideas as to how to improve the throughput?

 Thanks, Mike.



Re: spark on Windows 2008 failed to save RDD to windows shared folder

2015-05-22 Thread Ted Yu
The stack trace is related to hdfs.

Can you tell us which hadoop release you are using ?

Is this a secure cluster ?

Thanks

On Fri, May 22, 2015 at 1:55 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I used spark standalone cluster on Windows 2008. I kept on getting the
 following error when trying to save an RDD to a windows shared folder




 rdd.saveAsObjectFile(“file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj”)



 15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID
 12)

 java.io.IOException: Mkdirs failed to create
 file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12

 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)

 at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)

 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)

 at
 org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1071)

 at
 org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270)

 at
 org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527)

 at
 org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)

 at
 org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)

 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)

 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:64)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

  The T: drive is mapped to a windows shared folder, e.g.  T:  -
 \\10.196.119.230\myshare



 The id running spark does have write permission to this folder. It works
 most of the time but failed sometime.



 Can anybody tell me what is the problem here?



 Please advise. Thanks.



SparkSQL failing while writing into S3 for 'insert into table'

2015-05-22 Thread ogoh

Hello, 
I am using spark 1.3  Hive 0.13.1 in AWS.
From Spark-SQL, when running Hive query to export Hive query result into AWS
S3, it failed with the following message:
==
org.apache.hadoop.hive.ql.metadata.HiveException: checkPaths:
s3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1
has nested
directorys3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1/_temporary

at org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157)

at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298)

at org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686)

at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:230)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:124)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:249)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)
==

The query tested is 

spark-sqlcreate external table s3_dwserver_sql_t1 (q string) location
's3://test-dev/s3_dwserver_sql_t1')

spark-sqlinsert into table s3_dwserver_sql_t1 select q from api_search
where pdate='2015-05-12' limit 100;
==

It seems it generated query results into tmp dir firstly, and tries to
rename it into the right folder finally. But, it failed while renaming it. 

I appreciate any advice.
Thanks,
Okehee

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-failing-while-writing-into-S3-for-insert-into-table-tp23000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Hi,

1. Dynamic allocation is currently only supported with YARN, correct?

2. In spark streaming, it is possible to change the number of executors
while an application is running? If so, can the allocation be controlled by
the application, instead of using any already defined automatic policy?
That is, I want to be able to get more executors or decommission executors
on demand. Is there some way to achieve this?

Thanks.


Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Sorry, but I can't see on TD's comments how to allocate executors on
demand. It seems to me that he's talking about resources within an
executor, mapping shards to cores. I want to be able to decommission
executors/workers/machines.

On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote:

 For #1, the answer is yes.

 For #2, See TD's comments on SPARK-7661

 Cheers


 On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 1. Dynamic allocation is currently only supported with YARN, correct?

 2. In spark streaming, it is possible to change the number of executors
 while an application is running? If so, can the allocation be controlled by
 the application, instead of using any already defined automatic policy?
 That is, I want to be able to get more executors or decommission executors
 on demand. Is there some way to achieve this?

 Thanks.





Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Or should I shutdown the streaming context gracefully and then start it
again with a different number of executors?

On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Sorry, but I can't see on TD's comments how to allocate executors on
 demand. It seems to me that he's talking about resources within an
 executor, mapping shards to cores. I want to be able to decommission
 executors/workers/machines.

 On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote:

 For #1, the answer is yes.

 For #2, See TD's comments on SPARK-7661

 Cheers


 On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 1. Dynamic allocation is currently only supported with YARN, correct?

 2. In spark streaming, it is possible to change the number of executors
 while an application is running? If so, can the allocation be controlled by
 the application, instead of using any already defined automatic policy?
 That is, I want to be able to get more executors or decommission executors
 on demand. Is there some way to achieve this?

 Thanks.






Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Ted Yu
For #1, the answer is yes.

For #2, See TD's comments on SPARK-7661

Cheers


On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Hi,

 1. Dynamic allocation is currently only supported with YARN, correct?

 2. In spark streaming, it is possible to change the number of executors
 while an application is running? If so, can the allocation be controlled by
 the application, instead of using any already defined automatic policy?
 That is, I want to be able to get more executors or decommission executors
 on demand. Is there some way to achieve this?

 Thanks.



Re: spark.executor.extraClassPath - Values not picked up by executors

2015-05-22 Thread Yana Kadiyska
Todd, I don't have any answers for you...other than the file is actually
named spark-defaults.conf (not sure if you made a typo in the email or
misnamed the file...). Do any other options from that file get read?

I also wanted to ask if you built the spark-cassandra-connector-assembly-1.3
.0-SNAPSHOT.jar from trunk or if they published a 1.3 drop somewhere -- I'm
just starting out with Cassandra and discovered
https://datastax-oss.atlassian.net/browse/SPARKC-98 is still open...

On Fri, May 22, 2015 at 6:15 PM, Todd Nist tsind...@gmail.com wrote:

 I'm using the spark-cassandra-connector from DataStax in a spark streaming
 job launched from my own driver.  It is connecting a a standalone cluster
 on my local box which has two worker running.

 This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT.  I have
 added the following entry to my $SPARK_HOME/conf/spark-default.conf:

 spark.executor.extraClassPath 
 /projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar


 When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes
 up just fine.  As do the two workers with the following command:

 Worker 1, port 8081:

 radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker 
 spark://radtech.io:7077 --webui-port 8081 --cores 2

 Worker 2, port 8082

 radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker 
 spark://radtech.io:7077 --webui-port 8082 --cores 2

 When I execute the Driver connecting the the master:

 sbt app/run -Dspark.master=spark://radtech.io:7077

 It starts up, but when the executors are launched they do not include the
 entry in the spark.executor.extraClassPath:

 15/05/22 17:35:26 INFO Worker: Asked to launch executor 
 app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO 
 ExecutorRunner: Launch command: java -cp 
 /usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar
  -Dspark.driver.port=55932 -Xms512M -Xmx512M 
 org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
 akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler 
 --executor-id 0 --hostname 192.168.1.3 --cores 2 --app-id 
 app-20150522173526- --worker-url 
 akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker



 which will then cause the executor to fail with a ClassNotFoundException,
 which I would expect:

 [WARN] [2015-05-22 17:38:18,035] [org.apache.spark.scheduler.TaskSetManager]: 
 Lost task 0.0 in stage 2.0 (TID 23, 192.168.1.3): 
 java.lang.ClassNotFoundException: 
 com.datastax.spark.connector.rdd.partitioner.CassandraPartition
 at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:344)
 at 
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
 at 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
 at 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 I also notice that some of the entires on the executor classpath are
 duplicated?  This is a newly installed spark-1.3.1-bin-hadoop2.6
  standalone 

SparkSQL query plan to Stage wise breakdown

2015-05-22 Thread Pramod Biligiri
Hi,
Is there an easy way to see how a SparkSQL query plan maps to different
stages of the generated Spark job? The WebUI is entirely in terms of RDD
stages and I'm having a hard time mapping it back to my query.

Pramod


Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Ted Yu
That should do.

Cheers

On Fri, May 22, 2015 at 8:28 PM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Or should I shutdown the streaming context gracefully and then start it
 again with a different number of executors?

 On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Sorry, but I can't see on TD's comments how to allocate executors on
 demand. It seems to me that he's talking about resources within an
 executor, mapping shards to cores. I want to be able to decommission
 executors/workers/machines.

 On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote:

 For #1, the answer is yes.

 For #2, See TD's comments on SPARK-7661

 Cheers


 On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com
 wrote:

 Hi,

 1. Dynamic allocation is currently only supported with YARN, correct?

 2. In spark streaming, it is possible to change the number of executors
 while an application is running? If so, can the allocation be controlled by
 the application, instead of using any already defined automatic policy?
 That is, I want to be able to get more executors or decommission executors
 on demand. Is there some way to achieve this?

 Thanks.







Re: Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread tyronecai
may because of snappy-java,
https://issues.apache.org/jira/browse/SPARK-5081


On May 23, 2015, at 1:23 AM, Josh Rosen rosenvi...@gmail.com wrote:

 I don't think that 0.9.3 has been released, so I'm assuming that you're 
 running on branch-0.9.
 
 There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that 
 this question doesn't have a concise answer: 
 https://github.com/apache/spark/compare/branch-0.9...v1.3.1
 
 To narrow down the potential causes, have you tried comparing 0.9.3 to, say, 
 1.0.2 or branch-1.0, or some other version that's closer to 0.9?
 
 On Fri, May 22, 2015 at 9:43 AM, Shay Seng s...@urbanengines.com wrote:
 Hi. 
 I have a job that takes 
 ~50min with Spark 0.9.3 and 
 ~1.8hrs on Spark 1.3.1 on the same cluster.
 
 The only code difference between the two code bases is to fix the Seq - Iter 
 changes that happened in the Spark 1.x series.
 
 Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that 
 would cause such a large degradation in performance? Changes in partitioning 
 algorithms, scheduling etc?
 
 shay
 
 



Re: Bigints in pyspark

2015-05-22 Thread Davies Liu
Could you show up the schema and confirm that they are LongType?

df.printSchema()

On Mon, Apr 27, 2015 at 5:44 AM, jamborta jambo...@gmail.com wrote:
 hi all,

 I have just come across a problem where I have a table that has a few bigint
 columns, it seems if I read that table into a dataframe then collect it in
 pyspark, the bigints are stored and integers in python.

 (The problem is if I write it back to another table, I detect the hive type
 programmatically from the python type, so it turns those columns to
 integers)

 Is that intended this way or a bug?

 thanks,




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Bigints-in-pyspark-tp22668.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Tathagata Das
Something does not make sense. Receivers (currently) does not get blocked
(unless rate limit has been set) due to processing load. The receiver will
continue to receive data and store it in memory and until it is processed.
So I am still not sure how the data loss is happening. Unless you are
sending data at a faster rate than the receiver can handle (that more than
the max rate the receiver can save data in memory and replicate to other
nodes).

In general, if you are particular about data loss, then UDP is not really a
good choice in the first place. If you can try using TCP, try it. It would
at least eliminate the possibility that I mentioned above. Ultimately if
you try sending data faster that the receiver can handle (independent of
whether processing can handle), then you will loose data if you are using
UDP. You have to use TCP to naturally control the sending rate to match the
receiving rate in the receiver, without dropping data.


On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

 This is just a friendly ping, just to remind you of my query.

 Also, is there a possible explanation/example on the usage of
 AsyncRDDActions in Java ?

 On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 I am received data at UDP port 8060 and doing processing on it using
 Spark and storing the output in Neo4j.

 But the data I'm receiving and the data that is getting stored doesn't
 match probably because Neo4j API takes too long to push the data into
 database. Meanwhile, Spark is unable to receive data probably because the
 process is blocked.

 On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Can you elaborate on how the data loss is occurring?


 On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 That is completely alright, as the system will make sure the works get
 done.

 My major concern is, the data drop. Will using async stop data loss?

 On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
 wrote:

 If you cannot push data as fast as you are generating it, then async
 isnt going to help either. The work is just going to keep piling up as
 many many async jobs even though your batch processing times will be low 
 as
 that processing time is not going to reflect how much of overall work is
 pending in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com
 wrote:

 Hi,

 From my understanding of Spark Streaming, I created a spark entry
 point, for continuous UDP data, using:

 SparkConf conf = new 
 SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext
  jssc = new JavaStreamingContext(conf, new 
 Duration(1));JavaReceiverInputDStreamString lines = 
 jssc.receiverStream(new CustomReceiver(8060));

 Now, when I process this input stream using:

 JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= 
 hash.mapToPair(my-code)JavaPairDStream output= 
 tuple.reduceByKey(my-code)
 output.foreachRDD(
 new 
 Function2JavaPairRDDString,ArrayListString,Time,Void(){
 @Override
 public Void call(
 JavaPairRDDString, ArrayListString arg0,
 Time arg1) throws Exception {
 // TODO Auto-generated method stub
 new AsyncRDDActions(arg0.rdd(), null);
 arg0.foreachPartition(
 new 
 VoidFunctionIteratorTuple2String,ArrayListString(){

 @Override
 public void call(
 IteratorTuple2String, 
 ArrayListString arg0)
 throws Exception {

 // TODO Auto-generated method 
 stub
 GraphDatabaseService graphDb = 
 new 
 GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/)
 
 .setConfig(remote_shell_enabled, true)
 .newGraphDatabase();

 try (Transaction tx = 
 graphDb.beginTx()) {
 while (arg0.hasNext()) {
 Tuple2  String, 
 ArrayList  String  tuple = arg0.next();
 Node 
 HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
 boolean oldHMac=false;
 if (HMac!= null){
 
 System.out.println(Alread in Database: + tuple._1);
 oldHMac=true;
 }
  

Re: DataFrame groupBy vs RDD groupBy

2015-05-22 Thread Michael Armbrust
DataFrames have a lot more information about the data, so there is a whole
class of optimizations that are possible there that we cannot do in RDDs.
This is why we are focusing a lot of effort on this part of the project.
In Spark 1.4 you can accomplish what you want using the new window function
feature.  This can be done with SQL as you described or directly on a
DataFrame:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y)
df.select('x, 'y,
rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show

+-+-+--+
|x|y|number|
+-+-+--+
|a|1| 1|
|b|1| 2|
|c|2| 1|
|d|2| 2|
+-+-+--+

On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com
wrote:

 Hello everybody,

 I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
 of my code using groupBy became really slow.

 *1/ *Why does the groupBy of rdd is really slow in comparison to the
 groupBy
 of dataFrame?

 // DataFrame : running in few seconds
 val result = table.groupBy(col1).count

 // RDD : taking hours with a lot of /spilling in-memory/
 val schemaOriginel = table.schema
 val result = table.rdd.groupBy { r =
  val rs = RowSchema(r, schemaOriginel)
  val col1 = rs.getValueByName(col1)
  col1
   }.map(l = (l._1,l._2.size) ).count()


 *2/* My goal is to groupBy on a key, then to order each group over a column
 and finally to add the row number in each group. I had this code running
 before changing to Spark 1.3 and it worked fine, but since I have changed
 to
 DataFrame it is really slow.

  val schemaOriginel = table.schema
  val result = table.rdd.groupBy { r =
 val rs = RowSchema(r, schemaOriginel)
 val col1 = rs.getValueByName(col1)
  col1
 }.flatMap {
  l =
l._2.toList
  .sortBy {
   u =
 val rs = RowSchema(u, schemaOriginel)
 val col1 = rs.getValueByName(col1)
 val col2 = rs.getValueByName(col2)
 (col1, col2)
 } .zipWithIndex
 }

 /I think the SQL equivalent of what I try to do : /

 SELECT a,
ROW_NUMBER() OVER (PARTITION BY a) AS num
 FROM table.


  I don't think I can do this with a GroupedData (result of df.groupby). Any
 ideas on how I can speed up this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread Joseph Bradley
If you want to select specific variable combinations by hand, then you will
need to modify the dataset before passing it to the ML algorithm.  The
DataFrame API should make that easy to do.

If you want to have an ML algorithm select variables automatically, then I
would recommend using L1 regularization for now and possibly elastic net
after 1.4 is release, per DB's suggestion.

If you want detailed model statistics similar to what R provides, I've
created a JIRA for discussing how we should add that functionality to
MLlib.  Those types of stats will be added incrementally, but feedback
would be great for prioritization:
https://issues.apache.org/jira/browse/SPARK-7674

To answer your question: How are the weights calculated: is there a
correlation calculation with the variable of interest?
-- Weights are calculated as with all logistic regression algorithms, by
using convex optimization to minimize a regularized log loss.

Good luck!
Joseph

On Fri, May 22, 2015 at 1:07 PM, DB Tsai dbt...@dbtsai.com wrote:

 In Spark 1.4, Logistic Regression with elasticNet is implemented in ML
 pipeline framework. Model selection can be achieved through high
 lambda resulting lots of zero in the coefficients.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Fri, May 22, 2015 at 1:19 AM, SparknewUser
 melanie.galloi...@gmail.com wrote:
  I am new in MLlib and in Spark.(I use Scala)
 
  I'm trying to understand how LogisticRegressionWithLBFGS and
  LogisticRegressionWithSGD work.
  I usually use R to do logistic regressions but now I do it on Spark
  to be able to analyze Big Data.
 
  The model only returns weights and intercept. My problem is that I have
 no
  information about which variable is significant and which variable I had
  better
  to delete to improve my model. I only have the confusion matrix and the
 AUC
  to evaluate the performance.
 
  Is there any way to have information about the variables I put in my
 model?
  How can I try different variable combinations, do I have to modify the
  dataset
  of origin (e.g. delete one or several columns?)
  How are the weights calculated: is there a correlation calculation with
 the
  variable
  of interest?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Application on standalone cluster never changes state to be stopped

2015-05-22 Thread Edward Sargisson
Hi,
Environment: Spark standalone cluster running with a master and a work on a
small Vagrant VM. The Jetty Webapp on the same node calls the spark-submit
script to start the job.

From the contents of the stdout I can see that it's running successfully.
However, the spark-submit process never seems to complete (after 2 minutes)
and the state in the Web UI remains RUNNING.
The Application main calls SparkContext.stop and exits with zero.

What are the criteria for when an Application is considered finished?

Thanks in advance!
Edward


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Evo Eftimov
A receiver occupies a cpu core, an executor is simply a jvm instance and as 
such it can be granted any number of cores and ram

So check how many cores you have per executor


Sent from Samsung Mobile

div Original message /divdivFrom: Mike Trienis 
mike.trie...@orcsol.com /divdivDate:2015/05/22  21:51  (GMT+00:00) 
/divdivTo: user@spark.apache.org /divdivSubject: Re: Spark Streaming: 
all tasks running on one executor (Kinesis + Mongodb) /divdiv
/divI guess each receiver occupies a executor. So there was only one executor 
available for processing the job. 

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com wrote:
Hi All,

I have cluster of four nodes (three workers and one master, with one core each) 
which consumes data from Kinesis at 15 second intervals using two streams (i.e. 
receivers). The job simply grabs the latest batch and pushes it to MongoDB. I 
believe that the problem is that all tasks are executed on a single worker node 
and never distributed to the others. This is true even after I set the number 
of concurrentJobs to 3. Overall, I would really like to increase throughput 
(i.e. more than 500 records / second) and understand why all executors are not 
being utilized. 

Here are some parameters I have set: 
spark.streaming.blockInterval       200
spark.locality.wait 500
spark.streaming.concurrentJobs      3
This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
    val result = doSomething(rdd, time)
    result.foreachPartition { i =
        i.foreach(record = connection.insert(record))
    }
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
    rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike. 



Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Rok Roskar
on the worker/container that fails, the file not found is the first error
-- the output below is from the yarn log. There were some python worker
crashes for another job/stage earlier (see the warning at 18:36) but I
expect those to be unrelated to this file not found error.

==
LogType:stderr
Log Upload Time:15-May-2015 18:50:05
LogLength:5706
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
to kill Python Worker
15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
(TID 995)
java.io.FileNotFoundException:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
-44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:212)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local
spark dir:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
java.io.IOException: Failed to delete:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:933)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:165)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:162)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:162)
at
org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:156)
at
org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1208)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:88)
at org.apache.spark.executor.Executor.stop(Executor.scala:146)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:105)
at

RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
OR you can run Drools in a Central Server Mode ie as a common/shared service, 
but that would slowdown your Spark Streaming job due to the remote network call 
which will have to be generated for every single message 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:22 AM
To: 'Evo Eftimov'; 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Guillermo Ortiz
Hi,

I'm trying to connect to two topics of Kafka with Spark with DirectStream
but I get an error. I don't know if there're any limitation to do it,
because when I just access to one topics everything if right.

*val ssc = new StreamingContext(sparkConf, Seconds(5))*
*val kafkaParams = Map[String, String](metadata.broker.list -
quickstart.cloudera:9092)*
*val setTopic1 = Set(topic1)*
*val setTopic2 = Set(topic2)*

*val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
*val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


The error that I get is:
* 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
*15/05/22 13:12:40 ERROR OneForOneStrategy: *
*java.lang.NullPointerException*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
* at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
* at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
* at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


Are there any limitation to do it?


RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much



Re: Spark Streaming and Drools

2015-05-22 Thread Dibyendu Bhattacharya
Hi,

Sometime back I played with Distributed Rule processing by integrating
Drool with HBase Co-Processors ..and invoke Rules on any incoming data ..

https://github.com/dibbhatt/hbase-rule-engine

You can get some idea how to use Drools rules if you see this
RegionObserverCoprocessor ..

https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java


Idea is basically to create a stateless Ruleengine from the drl file and
fire the rule on incoming data ..

Even though the code is for invoking rules on HBase PUT object , but you
can get an idea ..and modify it for Spark..

Dibyendu



On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 I am not aware of existing examples but you can always “ask” Google



 Basically from Spark Streaming perspective, Drools is a third-party
 Software Library, you would invoke it in the same way as any other
 third-party software library from the Tasks (maps, filters etc) within your
 DAG job



 *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
 *Sent:* Friday, May 22, 2015 11:07 AM
 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Streaming and Drools



 Thanks a lot Evo,

 do you know where I can find some examples?

 Have a great one


 A G



 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

 You can deploy and invoke Drools as a Singleton on every Spark Worker Node
 / Executor / Worker JVM



 You can invoke it from e.g. map, filter etc and use the result from the
 Rule to make decision how to transform/filter an event/message



 *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
 *Sent:* Friday, May 22, 2015 9:43 AM
 *To:* user@spark.apache.org
 *Subject:* Spark Streaming and Drools



 Hi All,

 I'm deploying and architecture that uses flume for sending log information
 in a sink.

 Spark streaming read from this sink (pull strategy) e process al this
 information, during this process I would like to make some event
 processing. . . for example:

 Log appender writes information about all transactions in my trading
 platforms,

 if a platform user sells more than buy during a week I need to receive an
 alert on an event dashboard.

 How can I realize it? Is it possible with drools?

 Thanks so much





Re: 回复: 回复: How to use spark to access HBase with Security enabled

2015-05-22 Thread Ted Yu
Can you share the exception(s) you encountered ?

Thanks



 On May 22, 2015, at 12:33 AM, donhoff_h 165612...@qq.com wrote:
 
 Hi,
 
 My modified code is listed below, just add the SecurityUtil API.  I don't 
 know which propertyKeys I should use, so I make 2 my own propertyKeys to find 
 the keytab and principal.
 
 object TestHBaseRead2 {
  def main(args: Array[String]) {
 
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hbConf = HBaseConfiguration.create()
hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab)
hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb)
SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal)
val conn = ConnectionFactory.createConnection(hbConf)
val tbl = conn.getTable(TableName.valueOf(spark_t01))
try {
  val get = new Get(Bytes.toBytes(row01))
  val res = tbl.get(get)
  println(result:+res.toString)
}
finally {
  tbl.close()
  conn.close()
  es.shutdown()
}
 
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
val v = rdd.sum()
println(Value=+v)
sc.stop()
 
  }
 }
 
 
 -- 原始邮件 --
 发件人: yuzhihong;yuzhih...@gmail.com;
 发送时间: 2015年5月22日(星期五) 下午3:25
 收件人: donhoff_h165612...@qq.com;
 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org;
 主题: Re: 回复: How to use spark to access HBase with Security enabled
 
 Can you post the morning modified code ?
 
 Thanks
 
 
 
 On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote:
 
 Hi,
 
 Thanks very much for the reply.  I have tried the SecurityUtil. I can see 
 from log that this statement executed successfully, but I still can not pass 
 the authentication of HBase. And with more experiments, I found a new 
 interesting senario. If I run the program with yarn-client mode, the driver 
 can pass the authentication, but the executors can not. If I run the program 
 with yarn-cluster mode, both the driver and the executors can not pass the 
 authentication.  Can anybody give me some clue with this info? Many Thanks!
 
 
 -- 原始邮件 --
 发件人: yuzhihong;yuzhih...@gmail.com;
 发送时间: 2015年5月22日(星期五) 凌晨5:29
 收件人: donhoff_h165612...@qq.com;
 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org;
 主题: Re: How to use spark to access HBase with Security enabled
 
 Are the worker nodes colocated with HBase region servers ?
 
 Were you running as hbase super user ?
 
 You may need to login, using code similar to the following:
   if (isSecurityEnabled()) {
 
 SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
   }
 
 SecurityUtil is hadoop class.
 
 
 
 Cheers
 
 
 On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:
 Hi,
 
 Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
 Yarn. According to your advice I have changed the configuration. Now my 
 program can read the hbase-site.xml correctly. And it can also authenticate 
 with zookeeper successfully. 
 
 But I meet a new problem that is my program still can not pass the 
 authentication of HBase. Did you or anybody else ever meet such kind of 
 situation ?  I used a keytab file to provide the principal. Since it can 
 pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
 But it jsut can not pass the authentication of HBase. The exception is 
 listed below and could you or anybody else help me ? Still many many thanks!
 
 Exception***
 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
 connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
 quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
 baseZNode=/hbase
 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI 
 as SASL mechanism.
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
 server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate 
 using Login Context section 'Client'
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established 
 to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu 
 May 21 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri 
 May 22 16:03:18 CST 2015
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 
 22 11:43:32 CST 2015
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete 
 on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
 negotiated timeout = 4
 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable 
 called multiple times. Overwriting connection 

Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
Hi

I was using the wrong version of the  spark-hive jar. I downloaded the
right version of the jar from the cloudera repo and it works now.

Thanks,
Skanda

On Fri, May 22, 2015 at 2:36 PM, Skanda skanda.ganapa...@gmail.com wrote:

 Hi All,

 I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any
 luck solving the issue?

 Exception:

 Exception in thread main org.apache.spark.sql.AnalysisException:
 Unsupported language features in query: select * from
 everest_marts_test.hive_ql_test where daily_partition=20150101
 TOK_QUERY 1, 0,18, 14
   TOK_FROM 1, 4,8, 14
 TOK_TABREF 1, 6,8, 14
   TOK_TABNAME 1, 6,8, 14
 everest_marts_test 1, 6,6, 14
 hive_ql_test 1, 8,8, 33
   TOK_INSERT 0, -1,18, 0
 TOK_DESTINATION 0, -1,-1, 0
   TOK_DIR 0, -1,-1, 0
 TOK_TMP_FILE 0, -1,-1, 0
 TOK_SELECT 0, 0,2, 0
   TOK_SELEXPR 0, 2,2, 0
 TOK_ALLCOLREF 0, 2,2, 0
 TOK_WHERE 1, 10,18, 68
   TOK_FUNCTION 1, 12,18, 68
 in 1, 14,14, 68
 TOK_TABLE_OR_COL 1, 12,12, 52
   daily_partition 1, 12,12, 52
 20150101 1, 16,18, 72

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20150101 :
 20150101 1, 16,18, 72
  +

 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
   ;
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
 scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
 at
 org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
 at
 org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
 

RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



Re: Partitioning of Dataframes

2015-05-22 Thread Silvio Fiorito
This is added to 1.4.0

https://github.com/apache/spark/pull/5762







On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote:

Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need to 
make into a Dataframe again? Maybe like this: 
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird 
to me, though, and I'm not sure if the DF will be aware of its 
partitioning.

On 2015-05-22 12:55, ayan guha wrote:
 DataFrame is an abstraction of rdd. So you should be able to do
 df.rdd.partitioyBy. however as far as I know, equijoines already 
 optimizes
 partitioning. You may want to look explain plans more carefully and
 materialise interim joins.
  On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote:
 
 Hi,
 
 is there any way to control how Dataframes are partitioned? I'm doing 
 lots
 of joins and am seeing very large shuffle reads and writes in the 
 Spark UI.
 With PairRDDs you can control how the data is partitioned across nodes 
 with
 partitionBy. There is no such method on Dataframes however. Can I 
 somehow
 partition the underlying the RDD manually? I am currently using the 
 Python
 API.
 
 Thanks!
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parallel parameter tuning: distributed execution of MLlib algorithms

2015-05-22 Thread Hugo Ferreira

Hi,

I am currently experimenting with linear regression (SGD) (Spark + 
MLlib, ver. 1.2). At this point in time I need to fine-tune the 
hyper-parameters. I do this (for now) by an exhaustive grid search of 
the step size and the number of iterations. Currently I am on a dual 
core that acts as a master (local mode for now but will be adding spark 
worker later). In order to maximize throughput I need to execute each 
execution of the linear regression algorithm in parallel.


According to the documentation it seems like parallel jobs may be 
scheduled if they are executed in separate threads [1]. So this brings 
me to my first question: does this mean I am CPU bound by the Spark 
master? In other words the maximum number of jobs = maximum number of 
threads of the OS?


I searched the mailing list but did not find anything regarding MLlib 
itself. I even peaked into the new MLlib API that uses pipelines and has 
support for parameter tuning. However, it looks like each job (instance 
of the learning algorithm) is executed in sequence. Can anyone confirm 
this? This brings me to my 2ndo question: is their any example that 
shows how one can execute MLlib algorithms as parallel jobs?


Finally, is their any general technique I can use to execute an 
algorithm in a distributed manner using Spark? More specifically I would 
like to have several MLlib algorithms run in parallel. Can anyone show 
me an example of sorts to do this?


TIA.
Hugo F.







[1] https://spark.apache.org/docs/1.2.0/job-scheduling.html




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Alright, that doesn't seem to have made it into the Python API yet.

On 2015-05-22 15:12, Silvio Fiorito wrote:

This is added to 1.4.0

https://github.com/apache/spark/pull/5762







On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote:


Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need 
to

make into a Dataframe again? Maybe like this:
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird
to me, though, and I'm not sure if the DF will be aware of its
partitioning.

On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already
optimizes
partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm 
doing

lots
of joins and am seeing very large shuffle reads and writes in the
Spark UI.
With PairRDDs you can control how the data is partitioned across 
nodes

with
partitionBy. There is no such method on Dataframes however. Can I
somehow
partition the underlying the RDD manually? I am currently using the
Python
API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need to 
make into a Dataframe again? Maybe like this: 
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird 
to me, though, and I'm not sure if the DF will be aware of its 
partitioning.


On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already 
optimizes

partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm doing 
lots
of joins and am seeing very large shuffle reads and writes in the 
Spark UI.
With PairRDDs you can control how the data is partitioned across nodes 
with
partitionBy. There is no such method on Dataframes however. Can I 
somehow
partition the underlying the RDD manually? I am currently using the 
Python

API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LDA prediction on new document

2015-05-22 Thread Charles Earl
Dani,
Folding in I believe refers to setting up your Gibbs sampler (or other
model) with the learning word and document topic proportions as computed by
spark.

You might look at

https://lists.cs.princeton.edu/pipermail/topic-models/2014-May/002763.html

Where Jones suggests summing across columns of the term matrix for each of
the doc terms to get the topic proportions.


 I have not worked with spark lda but if you can pull the theta and phi
matrixes out of the spark model, you should be able to start with the
approximation as inference.

Have you tried vowpal wabbit or gensim?

Cheers

On Friday, May 22, 2015, Dani Qiu zongmin@gmail.com
javascript:_e(%7B%7D,'cvml','zongmin@gmail.com'); wrote:

 thanks, Ken
 but I am planning to use spark LDA in production. I cannot wait for the
 future release.
  At least,  provide some workaround solution.

 PS : in  SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567 ,
 mentioned This will require inference but should be able to use the same
 code, with a few modification to keep the inferred topics fixed. Can
 somebody elaborate it more ?  folding-in in EM ?  or  Can I  simply
 summing the topic distribution of the terms in the new document ?

 On Fri, May 22, 2015 at 2:23 PM, Ken Geis geis@gmail.com wrote:

 Dani, this appears to be addressed in SPARK-5567
 https://issues.apache.org/jira/browse/SPARK-5567, scheduled for Spark
 1.5.0.


 Ken

 On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote:

 *From: *Dani Qiu zongmin@gmail.com
 *Subject: **LDA prediction on new document*
 *Date: *May 21, 2015 at 8:48:40 PM PDT
 *To: *user@spark.apache.org


 Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM
 based LDA implementation. It returns both topics and topic distribution.

 My question is how can I use these parameters to predict on new document
 ?

 And I notice there is an Online LDA implementation in spark master
 branch, it only returns topics , how can I use this to  do prediction on
 new document (and trained document) ?


 thanks




-- 
- Charles


Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Cody Koeninger
I just verified that the following code works on 1.3.0 :

val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic1)

val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

stream1.print()

stream2.print()


So something else is probably going on in your case.  See if simply
printing the two streams works for you, then compare whats different in
your actual job.

On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 Hi,

 I'm trying to connect to two topics of Kafka with Spark with DirectStream
 but I get an error. I don't know if there're any limitation to do it,
 because when I just access to one topics everything if right.

 *val ssc = new StreamingContext(sparkConf, Seconds(5))*
 *val kafkaParams = Map[String, String](metadata.broker.list -
 quickstart.cloudera:9092)*
 *val setTopic1 = Set(topic1)*
 *val setTopic2 = Set(topic2)*

 *val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
 *val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


 The error that I get is:
 * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
 *15/05/22 13:12:40 ERROR OneForOneStrategy: *
 *java.lang.NullPointerException*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
 * at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
 * at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
 * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


 Are there any limitation to do it?



partitioning after extracting from a hive table?

2015-05-22 Thread Cesar Flores
I have a table in a Hive database partitioning by date. I notice that when
I query this table using HiveContext the created data frame has an specific
number of partitions.


Do this partitioning corresponds to my original table partitioning in Hive?


Thanks
-- 
Cesar Flores


Re: How to use spark to access HBase with Security enabled

2015-05-22 Thread Frank Staszak
You might also enable debug in: hadoop-env.sh
# Extra Java runtime options.  Empty by default.
export HADOOP_OPTS=$HADOOP_OPTS -Djava.net.preferIPv4Stack=true 
-Dsun.security.krb5.debug=true ${HADOOP_OPTS}”
and check that the principals are the same on the NameNode and DataNode.
and you can confirm the same on all nodes in hdfs-site.xml.
You can also ensure all nodes in the cluster are kerberized in core-site.xml 
(no auth by default) : 
property   
namehadoop.security.authentication/name   
valuekerberos/value   
descriptionSet the authentication for the cluster. Valid values are: 
simple or kerberos.   
/description  
/property
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html

Best Regards
Frank

 On May 22, 2015, at 4:25 AM, Ted Yu yuzhih...@gmail.com wrote:
 
 Can you share the exception(s) you encountered ?
 
 Thanks
 
 
 
 On May 22, 2015, at 12:33 AM, donhoff_h 165612...@qq.com wrote:
 
 Hi,
 
 My modified code is listed below, just add the SecurityUtil API.  I don't 
 know which propertyKeys I should use, so I make 2 my own propertyKeys to 
 find the keytab and principal.
 
 object TestHBaseRead2 {
  def main(args: Array[String]) {
 
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hbConf = HBaseConfiguration.create()
hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab)
hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb)
SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal)
val conn = ConnectionFactory.createConnection(hbConf)
val tbl = conn.getTable(TableName.valueOf(spark_t01))
try {
  val get = new Get(Bytes.toBytes(row01))
  val res = tbl.get(get)
  println(result:+res.toString)
}
finally {
  tbl.close()
  conn.close()
  es.shutdown()
}
 
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
val v = rdd.sum()
println(Value=+v)
sc.stop()
 
  }
 }
 
 
 -- 原始邮件 --
 发件人: yuzhihong;yuzhih...@gmail.com;
 发送时间: 2015年5月22日(星期五) 下午3:25
 收件人: donhoff_h165612...@qq.com;
 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org;
 主题: Re: 回复: How to use spark to access HBase with Security enabled
 
 Can you post the morning modified code ?
 
 Thanks
 
 
 
 On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote:
 
 Hi,
 
 Thanks very much for the reply.  I have tried the SecurityUtil. I can see 
 from log that this statement executed successfully, but I still can not 
 pass the authentication of HBase. And with more experiments, I found a new 
 interesting senario. If I run the program with yarn-client mode, the driver 
 can pass the authentication, but the executors can not. If I run the 
 program with yarn-cluster mode, both the driver and the executors can not 
 pass the authentication.  Can anybody give me some clue with this info? 
 Many Thanks!
 
 
 -- 原始邮件 --
 发件人: yuzhihong;yuzhih...@gmail.com;
 发送时间: 2015年5月22日(星期五) 凌晨5:29
 收件人: donhoff_h165612...@qq.com;
 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org;
 主题: Re: How to use spark to access HBase with Security enabled
 
 Are the worker nodes colocated with HBase region servers ?
 
 Were you running as hbase super user ?
 
 You may need to login, using code similar to the following:
   if (isSecurityEnabled()) {
 
 SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
   }
 
 
 SecurityUtil is hadoop class.
 
 
 
 Cheers
 
 
 On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote:
 Hi,
 
 Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
 Yarn. According to your advice I have changed the configuration. Now my 
 program can read the hbase-site.xml correctly. And it can also authenticate 
 with zookeeper successfully. 
 
 But I meet a new problem that is my program still can not pass the 
 authentication of HBase. Did you or anybody else ever meet such kind of 
 situation ?  I used a keytab file to provide the principal. Since it can 
 pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
 But it jsut can not pass the authentication of HBase. The exception is 
 listed below and could you or anybody else help me ? Still many many thanks!
 
 Exception***
 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
 connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
 quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
 baseZNode=/hbase
 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI 
 as SASL mechanism.
 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket 

Re: Partitioning of Dataframes

2015-05-22 Thread Ted Yu
Looking at python/pyspark/sql/dataframe.py :

@since(1.4)
def coalesce(self, numPartitions):

@since(1.3)
def repartition(self, numPartitions):

Would the above methods serve the purpose ?

Cheers

On Fri, May 22, 2015 at 6:57 AM, Karlson ksonsp...@siberie.de wrote:

 Alright, that doesn't seem to have made it into the Python API yet.


 On 2015-05-22 15:12, Silvio Fiorito wrote:

 This is added to 1.4.0

 https://github.com/apache/spark/pull/5762







 On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote:

  Hi,

 wouldn't df.rdd.partitionBy() return a new RDD that I would then need to
 make into a Dataframe again? Maybe like this:
 df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird
 to me, though, and I'm not sure if the DF will be aware of its
 partitioning.

 On 2015-05-22 12:55, ayan guha wrote:

 DataFrame is an abstraction of rdd. So you should be able to do
 df.rdd.partitioyBy. however as far as I know, equijoines already
 optimizes
 partitioning. You may want to look explain plans more carefully and
 materialise interim joins.
  On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote:

  Hi,

 is there any way to control how Dataframes are partitioned? I'm doing
 lots
 of joins and am seeing very large shuffle reads and writes in the
 Spark UI.
 With PairRDDs you can control how the data is partitioned across nodes
 with
 partitionBy. There is no such method on Dataframes however. Can I
 somehow
 partition the underlying the RDD manually? I am currently using the
 Python
 API.

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread Shay Seng
Hi.
I have a job that takes
~50min with Spark 0.9.3 and
~1.8hrs on Spark 1.3.1 on the same cluster.

The only code difference between the two code bases is to fix the Seq -
Iter changes that happened in the Spark 1.x series.

Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that
would cause such a large degradation in performance? Changes in
partitioning algorithms, scheduling etc?

shay


Help reading Spark UI tea leaves..

2015-05-22 Thread Shay Seng
Hi.

I have an RDD that I use repeatedly through many iterations of an
algorithm. To prevent recomputation, I persist the RDD (and incidentally I
also persist and checkpoint it's parents)


val consCostConstraintMap = consCost.join(constraintMap).map {
  case (cid, (costs,(mid1,_,mid2,_,_))) = {
(cid, (costs, mid1, mid2))
  }
}
consCostConstraintMap.setName(consCostConstraintMap)
consCostConstraintMap.persist(MEMORY_AND_DISK_SER)

...

later on in an iterative loop

val update = updatedTrips.join(consCostConstraintMap).flatMap {
  ...
}.treeReduce()

-

I can see from the UI that consCostConstraintMap is in storage
RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in
TachyonSize on Disk






consCostConstraintMap
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:4040/storage/rdd?id=113Memory
Serialized 1x Replicated600100%15.2 GB0.0 B0.0 B
-
In the Jobs list, I see the following pattern

Where each of the treeReduce line corresponds to one iteration loop

Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
stages): Succeeded/Total





13treeReduce at reconstruct.scala:243
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=132015/05/22
16:27:112.9 min16/16 (194 skipped)
9024/9024 (109225 skipped)
12treeReduce at reconstruct.scala:243
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=122015/05/22
16:24:162.9 min16/16 (148 skipped)
9024/9024 (82725 skipped)
11treeReduce at reconstruct.scala:243
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=112015/05/22
16:21:212.9 min16/16 (103 skipped)
9024/9024 (56280 skipped)
10treeReduce at reconstruct.scala:243
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=102015/05/22
16:18:282.9 min16/16 (69 skipped)
9024/9024 (36980 skipped)






--
If I push into one Job I see
*Completed Stages:*
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#completed
 16

   - *Skipped Stages:*
   
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#skipped
148

Completed Stages (16)
Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write525treeReduce at reconstruct.scala:243
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=525attempt=0
+details

2015/05/22 16:27:0942 ms
24/24
21.7 KB524...






519map at reconstruct.scala:153
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519attempt=0
+details

2015/05/22 16:24:161.2 min
600/600
14.8 GB8.4 GBThe last line map at reconstruct.scala:153
http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519attempt=0
corresponds
to val consCostConstraintMap = consCost.join(constraintMap).map {
Which I expected to have been cached.
Is there some way I can find out what it is spending 1.2 mins doing .. I
presume reading and writing GB of data. But why? Eveything should be in
memory?

Any clues on where I should start?

tks


Re: Spark Streaming and Drools

2015-05-22 Thread Antonio Giambanco
Thanks a lot Evo,
do you know where I can find some examples?

Have a great one

A G

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

 You can deploy and invoke Drools as a Singleton on every Spark Worker Node
 / Executor / Worker JVM



 You can invoke it from e.g. map, filter etc and use the result from the
 Rule to make decision how to transform/filter an event/message



 *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
 *Sent:* Friday, May 22, 2015 9:43 AM
 *To:* user@spark.apache.org
 *Subject:* Spark Streaming and Drools



 Hi All,

 I'm deploying and architecture that uses flume for sending log information
 in a sink.

 Spark streaming read from this sink (pull strategy) e process al this
 information, during this process I would like to make some event
 processing. . . for example:

 Log appender writes information about all transactions in my trading
 platforms,

 if a platform user sells more than buy during a week I need to receive an
 alert on an event dashboard.

 How can I realize it? Is it possible with drools?

 Thanks so much



Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Reynold Xin
I'm not sure if it is possible to overload the map function twice, once for
just KV pairs, and another for K and V separately.


On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved
 the RDD API, but it could be even more discoverable if made available via
 the API directly. I assume this was originally an omission that now needs
 to be kept for backwards compatibility, but would any of the repo owners be
 open to making this more discoverable to the point of API docs and tab
 completion (while keeping both binary and source compatibility)?


 class PairRDD extends RDD{
   pair methods
 }

 RDD{
   def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
 }

 As long as the implicits remain, then compatibility remains, but now it is
 explicit in the docs on how to get a PairRDD and in tab completion.

 Thoughts?

 Justin Pihony



Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
The (crude) proof of concept seems to work:

class RDD[V](value: List[V]){
  def doStuff = println(I'm doing stuff)
}

object RDD{
  implicit def toPair[V](x:RDD[V]) = new PairRDD(List((1,2)))
}

class PairRDD[K,V](value: List[(K,V)]) extends RDD (value){
  def doPairs = println(I'm using pairs)
}

class Context{
  def parallelize[K,V](x: List[(K,V)]) = new PairRDD(x)
  def parallelize[V](x: List[V]) = new RDD(x)
}

On Fri, May 22, 2015 at 2:44 PM, Reynold Xin r...@databricks.com wrote:

 I'm not sure if it is possible to overload the map function twice, once
 for just KV pairs, and another for K and V separately.


 On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved
 the RDD API, but it could be even more discoverable if made available via
 the API directly. I assume this was originally an omission that now needs
 to be kept for backwards compatibility, but would any of the repo owners be
 open to making this more discoverable to the point of API docs and tab
 completion (while keeping both binary and source compatibility)?


 class PairRDD extends RDD{
   pair methods
 }

 RDD{
   def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
 }

 As long as the implicits remain, then compatibility remains, but now it
 is explicit in the docs on how to get a PairRDD and in tab completion.

 Thoughts?

 Justin Pihony





How to share a (spring) singleton service with Spark?

2015-05-22 Thread Tristan107
 
I have a small Spark launcher app able to instanciate a service via Spring
xml application context and then broadcasts it in order to make it
available on remote nodes.

I suppose when a Spring service is instanciated, all dependencies are
instanciated and injected at the same time, so broadcasting it should
broadcast everything on remote nodes. My DAOs are not accessing a remote
database, instead they use inner collections which are loaded at startup
from xml files (in dao constructors).

This works fine when I launch my app via spark-submit and local mode, but
when I use yarn-cluster mode, I receive an exception (after all jobs have
been launched) saying my inner collections inside DAOs are empty.

All my objects are Serializable and my inner collections are mostly maps
(HashMap). I've tried to declare collections as static but it has no
effect on the broadcasting...

Could someone tell me what is happening here ? Is there a maximum depth for
broadcasting ?







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-spring-singleton-service-with-Spark-tp22997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the
RDD API, but it could be even more discoverable if made available via the
API directly. I assume this was originally an omission that now needs to be
kept for backwards compatibility, but would any of the repo owners be open
to making this more discoverable to the point of API docs and tab
completion (while keeping both binary and source compatibility)?


class PairRDD extends RDD{
  pair methods
}

RDD{
  def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
}

As long as the implicits remain, then compatibility remains, but now it is
explicit in the docs on how to get a PairRDD and in tab completion.

Thoughts?

Justin Pihony


Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Imran Rashid
hmm, sorry I think that disproves my theory.  Nothing else is immediately
coming to mind.  Its possible there is more info in the logs from the
driver, couldn't hurt to send those (though I don't have high hopes of
finding anything that way).  Offchance this could be from too many open
files or something?  Normally there is a different error msg, but I figure
its worth asking anyway.

The error you reported here was slightly different from your original
post.  This error is from writing the shuffle map output, while the
original error you reported was a fetch failed, which is from reading the
shuffle data on the reduce side in the next stage.  Does the map stage
actually finish, even though the tasks are throwing these errors while
writing the map output?  Or do you sometimes get failures on the shuffle
write side, and sometimes on the shuffle read side?  (Not that I think you
are doing anything wrong, but it may help narrow down the root cause and
possibly file a bug.)

thanks


On Fri, May 22, 2015 at 4:40 AM, Rok Roskar rokros...@gmail.com wrote:

 on the worker/container that fails, the file not found is the first
 error -- the output below is from the yarn log. There were some python
 worker crashes for another job/stage earlier (see the warning at 18:36) but
 I expect those to be unrelated to this file not found error.


 ==
 LogType:stderr
 Log Upload Time:15-May-2015 18:50:05
 LogLength:5706
 Log Contents:
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
 j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
 to kill Python Worker
 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
 (TID 995)
 java.io.FileNotFoundException:
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
 -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:212)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
 at
 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
 at
 org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local
 spark dir:
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
 java.io.IOException: Failed to delete:
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
 at 

Re: partitioning after extracting from a hive table?

2015-05-22 Thread ayan guha
I guess not. Spark partitions correspond to number of splits.
On 23 May 2015 00:02, Cesar Flores ces...@gmail.com wrote:


 I have a table in a Hive database partitioning by date. I notice that when
 I query this table using HiveContext the created data frame has an specific
 number of partitions.


 Do this partitioning corresponds to my original table partitioning in Hive?


 Thanks
 --
 Cesar Flores



Re: Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread Josh Rosen
I don't think that 0.9.3 has been released, so I'm assuming that you're
running on branch-0.9.

There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that
this question doesn't have a concise answer:
https://github.com/apache/spark/compare/branch-0.9...v1.3.1

To narrow down the potential causes, have you tried comparing 0.9.3 to,
say, 1.0.2 or branch-1.0, or some other version that's closer to 0.9?

On Fri, May 22, 2015 at 9:43 AM, Shay Seng s...@urbanengines.com wrote:

 Hi.
 I have a job that takes
 ~50min with Spark 0.9.3 and
 ~1.8hrs on Spark 1.3.1 on the same cluster.

 The only code difference between the two code bases is to fix the Seq -
 Iter changes that happened in the Spark 1.x series.

 Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that
 would cause such a large degradation in performance? Changes in
 partitioning algorithms, scheduling etc?

 shay




Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-22 Thread Xin Liu
Thank you guys for the prompt help.

I ended up building spark master and verified what DB has suggested.

val lr = (new MlLogisticRegression)
   .setFitIntercept(true)
   .setMaxIter(35)

 val model = lr.fit(sqlContext.createDataFrame(training))
 val scoreAndLabels = model.transform(sqlContext.createDataFrame(test))
   .select(probability, label)
   .map { case Row(probability: Vector, label: Double) =
 (probability(1), label)
   }

Without doing much tuning, above generates

Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914]
Intercept: -3.3076806966913006
Area under ROC: 0.7033511043412033

I also tried it on a much bigger dataset I have and its results are close
to what I get from statsmodel.

Now early waiting for the 1.4 release.

Thanks,
Xin



On Wed, May 20, 2015 at 9:37 PM, Chris Gore cdg...@cdgore.com wrote:

 I tried running this data set as described with my own implementation of
 L2 regularized logistic regression using LBFGS to compare:
 https://github.com/cdgore/fitbox

 Intercept: -0.886745823033
 Weights (['gre', 'gpa', 'rank']):[ 0.28862268  0.19402388 -0.36637964]
 Area under ROC: 0.724056603774

 The difference could be from the feature preprocessing as mentioned.  I
 normalized the features around 0:

 binary_train_normalized = (binary_train - binary_train.mean()) /
 binary_train.std()
 binary_test_normalized = (binary_test - binary_train.mean()) /
 binary_train.std()

 On a data set this small, the difference in models could also be the
 result of how the training/test sets were split.

 Have you tried running k-folds cross validation on a larger data set?

 Chris

 On May 20, 2015, at 6:15 PM, DB Tsai d...@netflix.com.INVALID wrote:

 Hi Xin,

 If you take a look at the model you trained, the intercept from Spark
 is significantly smaller than StatsModel, and the intercept represents
 a prior on categories in LOR which causes the low accuracy in Spark
 implementation. In LogisticRegressionWithLBFGS, the intercept is
 regularized due to the implementation of Updater, and the intercept
 should not be regularized.

 In the new pipleline APIs, a LOR with elasticNet is implemented, and
 the intercept is properly handled.

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala

 As you can see the tests,

 https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 the result is exactly the same as R now.

 BTW, in both version, the feature scalings are done before training,
 and we train the model in scaled space but transform the model weights
 back to original space. The only difference is in the mllib version,
 LogisticRegressionWithLBFGS regularizes the intercept while in the ml
 version, the intercept is excluded from regularization. As a result,
 if lambda is zero, the model should be the same.



 On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote:

 Hi,

 I have tried a few models in Mllib to train a LogisticRegression model.
 However, I consistently get much better results using other libraries such
 as statsmodel (which gives similar results as R) in terms of AUC. For
 illustration purpose, I used a small data (I have tried much bigger data)
 http://www.ats.ucla.edu/stat/data/binary.csv in
 http://www.ats.ucla.edu/stat/r/dae/logit.htm

 Here is the snippet of my usage of LogisticRegressionWithLBFGS.

 val algorithm = new LogisticRegressionWithLBFGS
 algorithm.setIntercept(true)
 algorithm.optimizer
   .setNumIterations(100)
   .setRegParam(0.01)
   .setConvergenceTol(1e-5)
 val model = algorithm.run(training)
 model.clearThreshold()
 val scoreAndLabels = test.map { point =
   val score = model.predict(point.features)
   (score, point.label)
 }
 val metrics = new BinaryClassificationMetrics(scoreAndLabels)
 val auROC = metrics.areaUnderROC()

 I did a (0.6, 0.4) split for training/test. The response is admit and
 features are GRE score, GPA, and college Rank.

 Spark:
 Weights (GRE, GPA, Rank):
 [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
 Intercept: -0.6488972641282202
 Area under ROC: 0.6294070512820512

 StatsModel:
 Weights [0.0018, 0.7220, -0.3148]
 Intercept: -3.5913
 Area under ROC: 0.69

 The weights from statsmodel seems more reasonable if you consider for a one
 unit increase in gpa, the log odds of being admitted to graduate school
 increases by 0.72 in statsmodel than 0.04 in Spark.

 I have seen much bigger difference with other data. So my question is has
 anyone compared the results with other libraries and is anything wrong with
 my code to invoke LogisticRegressionWithLBFGS?

 As the real data I am processing is pretty big and really want to use Spark
 to get this to work. Please let me know if you have similar experience and
 how you resolve it.

 Thanks,
 Xin