Re: DataFrame Column Alias problem
However this returns a single column of c, without showing the original col1 . On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com wrote: df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: Issues with constants in Spark HiveQL queries
Hi All, I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any luck solving the issue? Exception: Exception in thread main org.apache.spark.sql.AnalysisException: Unsupported language features in query: select * from everest_marts_test.hive_ql_test where daily_partition=20150101 TOK_QUERY 1, 0,18, 14 TOK_FROM 1, 4,8, 14 TOK_TABREF 1, 6,8, 14 TOK_TABNAME 1, 6,8, 14 everest_marts_test 1, 6,6, 14 hive_ql_test 1, 8,8, 33 TOK_INSERT 0, -1,18, 0 TOK_DESTINATION 0, -1,-1, 0 TOK_DIR 0, -1,-1, 0 TOK_TMP_FILE 0, -1,-1, 0 TOK_SELECT 0, 0,2, 0 TOK_SELEXPR 0, 2,2, 0 TOK_ALLCOLREF 0, 2,2, 0 TOK_WHERE 1, 10,18, 68 TOK_FUNCTION 1, 12,18, 68 in 1, 14,14, 68 TOK_TABLE_OR_COL 1, 12,12, 52 daily_partition 1, 12,12, 52 20150101 1, 16,18, 72 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20150101 : 20150101 1, 16,18, 72 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261) ; at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38) at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:234) at org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92) at
Re: LDA prediction on new document
Dani, this appears to be addressed in SPARK-5567, scheduled for Spark 1.5.0. Ken On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote: From: Dani Qiu zongmin@gmail.com Subject: LDA prediction on new document Date: May 21, 2015 at 8:48:40 PM PDT To: user@spark.apache.org Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0 mllib provide EM based LDA implementation. It returns both topics and topic distribution. My question is how can I use these parameters to predict on new document ? And I notice there is an Online LDA implementation in spark master branch, it only returns topics , how can I use this to do prediction on new document (and trained document) ? thanks
Re: DataFrame Column Alias problem
Despite the odd usage, it does the trick, thanks Reynold! On Fri, May 22, 2015 at 2:47 PM Reynold Xin r...@databricks.com wrote: In 1.4 it actually shows col1 by default. In 1.3, you can add col1 to the output, i.e. df.groupBy($col1).agg($col1, count($col1).as(c)).show() On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com wrote: However this returns a single column of c, without showing the original col1. On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com wrote: df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: Spark Memory management
You can look at the logic for offloading data from Memory by looking at ensureFreeSpace https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L416 call. And dropFromMemory https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1011 is the call responsible for putting the data into disk (if applicable). Thanks Best Regards On Fri, May 22, 2015 at 1:01 PM, swaranga sarma.swara...@gmail.com wrote: Experts, This is an academic question. Since Spark runs on the JVM, how it is able to do things like offloading RDDs from memory to disk when the data cannot fit into memory. How are the calculations performed? Does it use the methods availabe in the java.lang.Runtime class to get free/available memory? How accurate are these calculations? Thanks for any inputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Memory management
Experts, This is an academic question. Since Spark runs on the JVM, how it is able to do things like offloading RDDs from memory to disk when the data cannot fit into memory. How are the calculations performed? Does it use the methods availabe in the java.lang.Runtime class to get free/available memory? How accurate are these calculations? Thanks for any inputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Official Docker container for Spark
spark src have dockerfile ,you can use this -- Original -- From: tridib;tridib.sama...@live.com; Date: Fri, May 22, 2015 03:25 AM To: useruser@spark.apache.org; Subject: Official Docker container for Spark Hi, I am using spark 1.2.0. Can you suggest docker containers which can be deployed in production? I found lot of spark images in https://registry.hub.docker.com/ . But could not figure out which one to use. None of them seems like official image. Does anybody have any recommendation? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Official-Docker-container-for-Spark-tp22977.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [pyspark] Starting workers in a virtualenv
That works, thank you! On 2015-05-22 03:15, Davies Liu wrote: Could you try with specify PYSPARK_PYTHON to the path of python in your virtual env, for example PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py On Mon, Apr 20, 2015 at 12:51 AM, Karlson ksonsp...@siberie.de wrote: Hi all, I am running the Python process that communicates with Spark in a virtualenv. Is there any way I can make sure that the Python processes of the workers are also started in a virtualenv? Currently I am getting ImportErrors when the worker tries to unpickle stuff that is not installed system-wide. For now both the worker and the driver run on the same machine in local mode. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming and Drools
Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: Official Docker container for Spark
Use this: sequenceiq/docker Here's a link to their github repo: docker-spark https://github.com/sequenceiq/docker-spark They have repos for other big data tools too which are agin really nice. Its being maintained properly by their devs and
Re: rdd.sample() methods very slow
You can do something like this: val myRdd = ... val rddSampledByPartition = PartitionPruningRDD.create(myRdd, i = Random.nextDouble() 0.1) // this samples 10% of the partitions rddSampledByPartition.mapPartitions { iter = iter.take(10) } // take the first 10 elements out of each partition On Thu, May 21, 2015 at 11:36 AM, Sean Owen so...@cloudera.com wrote: If sampling whole partitions is sufficient (or a part of a partition), sure you could mapPartitionsWithIndex and decide whether to process a partition at all based on its # and skip the rest. That's much faster. On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I don't need to be 100% randome. How about randomly pick a few partitions and return all docs in those partitions? Is rdd.mapPartitionsWithIndex() the right method to use to just process a small portion of partitions? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LDA prediction on new document
thanks, Ken but I am planning to use spark LDA in production. I cannot wait for the future release. At least, provide some workaround solution. PS : in SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567 , mentioned This will require inference but should be able to use the same code, with a few modification to keep the inferred topics fixed. Can somebody elaborate it more ? folding-in in EM ? or Can I simply summing the topic distribution of the terms in the new document ? On Fri, May 22, 2015 at 2:23 PM, Ken Geis geis@gmail.com wrote: Dani, this appears to be addressed in SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567, scheduled for Spark 1.5.0. Ken On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote: *From: *Dani Qiu zongmin@gmail.com *Subject: **LDA prediction on new document* *Date: *May 21, 2015 at 8:48:40 PM PDT *To: *user@spark.apache.org Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0 mllib provide EM based LDA implementation. It returns both topics and topic distribution. My question is how can I use these parameters to predict on new document ? And I notice there is an Online LDA implementation in spark master branch, it only returns topics , how can I use this to do prediction on new document (and trained document) ? thanks
Re: 回复: How to use spark to access HBase with Security enabled
Can you post the morning modified code ? Thanks On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote: Hi, Thanks very much for the reply. I have tried the SecurityUtil. I can see from log that this statement executed successfully, but I still can not pass the authentication of HBase. And with more experiments, I found a new interesting senario. If I run the program with yarn-client mode, the driver can pass the authentication, but the executors can not. If I run the program with yarn-cluster mode, both the driver and the executors can not pass the authentication. Can anybody give me some clue with this info? Many Thanks! -- 原始邮件 -- 发件人: yuzhihong;yuzhih...@gmail.com; 发送时间: 2015年5月22日(星期五) 凌晨5:29 收件人: donhoff_h165612...@qq.com; 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 主题: Re: How to use spark to access HBase with Security enabled Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for table ns_dev1:hd01. 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730) at
?????? ?????? How to use spark to access HBase with Security enabled
Hi, My modified code is listed below, just add the SecurityUtil API. I don't know which propertyKeys I should use, so I make 2 my own propertyKeys to find the keytab and principal. object TestHBaseRead2 { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create() hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab) hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb) SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal) val conn = ConnectionFactory.createConnection(hbConf) val tbl = conn.getTable(TableName.valueOf(spark_t01)) try { val get = new Get(Bytes.toBytes(row01)) val res = tbl.get(get) println(result:+res.toString) } finally { tbl.close() conn.close() es.shutdown() } val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) val v = rdd.sum() println(Value=+v) sc.stop() } } -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??22??(??) 3:25 ??: donhoff_h165612...@qq.com; : Bill Qbill.q@gmail.com; useruser@spark.apache.org; : Re: ?? How to use spark to access HBase with Security enabled Can you post the morning modified code ? Thanks On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote: Hi, Thanks very much for the reply. I have tried the SecurityUtil. I can see from log that this statement executed successfully, but I still can not pass the authentication of HBase. And with more experiments, I found a new interesting senario. If I run the program with yarn-client mode, the driver can pass the authentication, but the executors can not. If I run the program with yarn-cluster mode, both the driver and the executors can not pass the authentication. Can anybody give me some clue with this info? Many Thanks! -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??22??(??) 5:29 ??: donhoff_h165612...@qq.com; : Bill Qbill.q@gmail.com; useruser@spark.apache.org; : Re: How to use spark to access HBase with Security enabled Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for
Re: Storing spark processed output to Database asynchronously.
This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayListString zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println(Already in Database: + zipcode); if(oldHMac==true Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode);
?????? How to use spark to access HBase with Security enabled
Hi, Thanks very much for the reply. I have tried the SecurityUtil. I can see from log that this statement executed successfully, but I still can not pass the authentication of HBase. And with more experiments, I found a new interesting senario. If I run the program with yarn-client mode, the driver can pass the authentication, but the executors can not. If I run the program with yarn-cluster mode, both the driver and the executors can not pass the authentication. Can anybody give me some clue with this info? Many Thanks! -- -- ??: yuzhihong;yuzhih...@gmail.com; : 2015??5??22??(??) 5:29 ??: donhoff_h165612...@qq.com; : Bill Qbill.q@gmail.com; useruser@spark.apache.org; : Re: How to use spark to access HBase with Security enabled Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection and table reference; TableInputFormatBase will not close these old references when done. 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for table ns_dev1:hd01. 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:730) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:727) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415)
Re: DataFrame Column Alias problem
In 1.4 it actually shows col1 by default. In 1.3, you can add col1 to the output, i.e. df.groupBy($col1).agg($col1, count($col1).as(c)).show() On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com wrote: However this returns a single column of c, without showing the original col1. On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com wrote: df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: Spark Memory management
in spark src this class org.apache.spark.deploy.worker.WorkerArguments def inferDefaultCores(): Int = { Runtime.getRuntime.availableProcessors() } def inferDefaultMemory(): Int = { val ibmVendor = System.getProperty(java.vendor).contains(IBM) var totalMb = 0 try { val bean = ManagementFactory.getOperatingSystemMXBean() if (ibmVendor) { val beanClass = Class.forName(com.ibm.lang.management.OperatingSystemMXBean) val method = beanClass.getDeclaredMethod(getTotalPhysicalMemory) totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } else { val beanClass = Class.forName(com.sun.management.OperatingSystemMXBean) val method = beanClass.getDeclaredMethod(getTotalPhysicalMemorySize) totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } } catch { case e: Exception = { totalMb = 2*1024 System.out.println(Failed to get total physical memory. Using + totalMb + MB) } } // Leave out 1 GB for the operating system, but don't return a negative memory size math.max(totalMb - 1024, 512) } -- Original -- From: swaranga;sarma.swara...@gmail.com; Date: Fri, May 22, 2015 03:31 PM To: useruser@spark.apache.org; Subject: Spark Memory management Experts, This is an academic question. Since Spark runs on the JVM, how it is able to do things like offloading RDDs from memory to disk when the data cannot fit into memory. How are the calculations performed? Does it use the methods availabe in the java.lang.Runtime class to get free/available memory? How accurate are these calculations? Thanks for any inputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext fails when querying large external Parquet tables
What is also strange is that this seems to work on external JSON data, but not Parquet. I’ll try to do more verification of that next week. On May 22, 2015, at 16:24, yana yana.kadiy...@gmail.com wrote: There is an open Jira on Spark not pushing predicates to metastore. I have a large dataset with many partitions but doing anything with it 8s very slow...But I am surprised Spark 1.2 worked for you: it has this problem... Original message From: Andrew Otto Date:05/22/2015 3:51 PM (GMT-05:00) To: user@spark.apache.org Cc: Joseph Allemandou ,Madhumitha Viswanathan Subject: HiveContext fails when querying large external Parquet tables Hi all, (This email was easier to write in markdown, so I’ve created a gist with its contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 https://gist.github.com/ottomata/f91ea76cece97444e269. I’ll paste the markdown content in the email body here too.) --- We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 1.1.0. Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. Since upgrading, we can no longer query our large webrequest dataset using HiveContext. HiveContext + Parquet and other file types work fine with external tables (We have a similarly large JSON external table that works just fine with HiveContext.) Our webrequest dataset is stored in hourly partitioned Parquet files. We mainly interact with this dataset via a Hive external table, but also have been using Spark's HiveContext. ``` # This single hourly directory is only 5.3M $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 5.3 M 15.8 M /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 # This monthly directory is 1.8T. (There are subdirectories down to hourly level here too.) $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 1.8 T 5.3 T /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 ``` If I create a Hive table on top of this data, and add the single hourly partition, querying works via both Hive and Spark HiveContext ```sql hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS `otto.webrequest_few_partitions_big_data`( `hostname` string COMMENT 'Source node hostname', `sequence` bigint COMMENT 'Per host sequence number', `dt`string COMMENT 'Timestame at cache in ISO 8601', `time_firstbyte`double COMMENT 'Time to first byte', `ip`string COMMENT 'IP of packet at cache', `cache_status` string COMMENT 'Cache status', `http_status` string COMMENT 'HTTP status of response', `response_size` bigint COMMENT 'Response size', `http_method` string COMMENT 'HTTP method of request', `uri_host` string COMMENT 'Host of request', `uri_path` string COMMENT 'Path of request', `uri_query` string COMMENT 'Query of request', `content_type` string COMMENT 'Content-Type header of response', `referer` string COMMENT 'Referer header of request', `x_forwarded_for` string COMMENT 'X-Forwarded-For header of request', `user_agent`string COMMENT 'User-Agent header of request', `accept_language` string COMMENT 'Accept-Language header of request', `x_analytics` string COMMENT 'X-Analytics header of response', `range` string COMMENT 'Range header of response', `is_pageview` boolean COMMENT 'Indicates if this record was marked as a pageview during refinement', `record_version`string COMMENT 'Keeps track of changes in the table content definition - https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest' https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest', `client_ip` string COMMENT 'Client IP computed during refinement using ip and x_forwarded_for', `geocoded_data` mapstring, string COMMENT 'Geocoded map with continent, country_code, country, city, subdivision, postal_code, latitude, longitude, timezone keys and associated values.', `x_cache` string COMMENT 'X-Cache header of response', `user_agent_map`mapstring, string COMMENT 'User-agent map with browser_name, browser_major, device, os_name, os_minor, os_major keys and associated values', `x_analytics_map` mapstring, string COMMENT 'X_analytics map view of the x_analytics field', `ts`timestampCOMMENT 'Unix timestamp in milliseconds extracted from dt', `access_method` string COMMENT 'Method used to accessing the site (mobile app|mobile web|desktop)', `agent_type`string COMMENT 'Categorise the agent making the webrequest as either user or spider (automatas to be added).',
RE: HiveContext fails when querying large external Parquet tables
There is an open Jira on Spark not pushing predicates to metastore. I have a large dataset with many partitions but doing anything with it 8s very slow...But I am surprised Spark 1.2 worked for you: it has this problem... div Original message /divdivFrom: Andrew Otto ao...@wikimedia.org /divdivDate:05/22/2015 3:51 PM (GMT-05:00) /divdivTo: user@spark.apache.org /divdivCc: Joseph Allemandou jalleman...@wikimedia.org,Madhumitha Viswanathan mviswanat...@wikimedia.org /divdivSubject: HiveContext fails when querying large external Parquet tables /divdiv /divHi all, (This email was easier to write in markdown, so I’ve created a gist with its contents here: https://gist.github.com/ottomata/f91ea76cece97444e269. I’ll paste the markdown content in the email body here too.) --- We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 1.1.0. Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. Since upgrading, we can no longer query our large webrequest dataset using HiveContext. HiveContext + Parquet and other file types work fine with external tables (We have a similarly large JSON external table that works just fine with HiveContext.) Our webrequest dataset is stored in hourly partitioned Parquet files. We mainly interact with this dataset via a Hive external table, but also have been using Spark's HiveContext. ``` # This single hourly directory is only 5.3M $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 5.3 M 15.8 M /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 # This monthly directory is 1.8T. (There are subdirectories down to hourly level here too.) $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 1.8 T 5.3 T /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 ``` If I create a Hive table on top of this data, and add the single hourly partition, querying works via both Hive and Spark HiveContext ```sql hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS `otto.webrequest_few_partitions_big_data`( `hostname` string COMMENT 'Source node hostname', `sequence` bigint COMMENT 'Per host sequence number', `dt`string COMMENT 'Timestame at cache in ISO 8601', `time_firstbyte`double COMMENT 'Time to first byte', `ip`string COMMENT 'IP of packet at cache', `cache_status` string COMMENT 'Cache status', `http_status` string COMMENT 'HTTP status of response', `response_size` bigint COMMENT 'Response size', `http_method` string COMMENT 'HTTP method of request', `uri_host` string COMMENT 'Host of request', `uri_path` string COMMENT 'Path of request', `uri_query` string COMMENT 'Query of request', `content_type` string COMMENT 'Content-Type header of response', `referer` string COMMENT 'Referer header of request', `x_forwarded_for` string COMMENT 'X-Forwarded-For header of request', `user_agent`string COMMENT 'User-Agent header of request', `accept_language` string COMMENT 'Accept-Language header of request', `x_analytics` string COMMENT 'X-Analytics header of response', `range` string COMMENT 'Range header of response', `is_pageview` boolean COMMENT 'Indicates if this record was marked as a pageview during refinement', `record_version`string COMMENT 'Keeps track of changes in the table content definition - https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest', `client_ip` string COMMENT 'Client IP computed during refinement using ip and x_forwarded_for', `geocoded_data` mapstring, string COMMENT 'Geocoded map with continent, country_code, country, city, subdivision, postal_code, latitude, longitude, timezone keys and associated values.', `x_cache` string COMMENT 'X-Cache header of response', `user_agent_map`mapstring, string COMMENT 'User-agent map with browser_name, browser_major, device, os_name, os_minor, os_major keys and associated values', `x_analytics_map` mapstring, string COMMENT 'X_analytics map view of the x_analytics field', `ts`timestampCOMMENT 'Unix timestamp in milliseconds extracted from dt', `access_method` string COMMENT 'Method used to accessing the site (mobile app|mobile web|desktop)', `agent_type`string COMMENT 'Categorise the agent making the webrequest as either user or spider (automatas to be added).', `is_zero` boolean COMMENT 'Indicates if the webrequest is accessed through a zero provider', `referer_class` string COMMENT 'Indicates if a referer is internal, external or unknown.' ) PARTITIONED BY ( `webrequest_source` string COMMENT 'Source cluster', `year`
Re: Trying to connect to many topics with several DirectConnect
Can you show us the rest of the program? When are you starting, or stopping the context. Is the exception occuring right after start or stop? What about log4j logs, what does it say? On Fri, May 22, 2015 at 7:12 AM, Cody Koeninger c...@koeninger.org wrote: I just verified that the following code works on 1.3.0 : val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic1) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic2) stream1.print() stream2.print() So something else is probably going on in your case. See if simply printing the two streams works for you, then compare whats different in your actual job. On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm trying to connect to two topics of Kafka with Spark with DirectStream but I get an error. I don't know if there're any limitation to do it, because when I just access to one topics everything if right. *val ssc = new StreamingContext(sparkConf, Seconds(5))* *val kafkaParams = Map[String, String](metadata.broker.list - quickstart.cloudera:9092)* *val setTopic1 = Set(topic1)* *val setTopic2 = Set(topic2)* *val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* *val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* The error that I get is: * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* *15/05/22 13:12:40 ERROR OneForOneStrategy: * *java.lang.NullPointerException* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* Are there any limitation to do it?
HiveContext fails when querying large external Parquet tables
Hi all, (This email was easier to write in markdown, so I’ve created a gist with its contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 https://gist.github.com/ottomata/f91ea76cece97444e269. I’ll paste the markdown content in the email body here too.) --- We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 1.1.0. Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. Since upgrading, we can no longer query our large webrequest dataset using HiveContext. HiveContext + Parquet and other file types work fine with external tables (We have a similarly large JSON external table that works just fine with HiveContext.) Our webrequest dataset is stored in hourly partitioned Parquet files. We mainly interact with this dataset via a Hive external table, but also have been using Spark's HiveContext. ``` # This single hourly directory is only 5.3M $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 5.3 M 15.8 M /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0 # This monthly directory is 1.8T. (There are subdirectories down to hourly level here too.) $ hdfs dfs -du -s -h /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 1.8 T 5.3 T /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5 ``` If I create a Hive table on top of this data, and add the single hourly partition, querying works via both Hive and Spark HiveContext ```sql hive (otto) CREATE EXTERNAL TABLE IF NOT EXISTS `otto.webrequest_few_partitions_big_data`( `hostname` string COMMENT 'Source node hostname', `sequence` bigint COMMENT 'Per host sequence number', `dt`string COMMENT 'Timestame at cache in ISO 8601', `time_firstbyte`double COMMENT 'Time to first byte', `ip`string COMMENT 'IP of packet at cache', `cache_status` string COMMENT 'Cache status', `http_status` string COMMENT 'HTTP status of response', `response_size` bigint COMMENT 'Response size', `http_method` string COMMENT 'HTTP method of request', `uri_host` string COMMENT 'Host of request', `uri_path` string COMMENT 'Path of request', `uri_query` string COMMENT 'Query of request', `content_type` string COMMENT 'Content-Type header of response', `referer` string COMMENT 'Referer header of request', `x_forwarded_for` string COMMENT 'X-Forwarded-For header of request', `user_agent`string COMMENT 'User-Agent header of request', `accept_language` string COMMENT 'Accept-Language header of request', `x_analytics` string COMMENT 'X-Analytics header of response', `range` string COMMENT 'Range header of response', `is_pageview` boolean COMMENT 'Indicates if this record was marked as a pageview during refinement', `record_version`string COMMENT 'Keeps track of changes in the table content definition - https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest', `client_ip` string COMMENT 'Client IP computed during refinement using ip and x_forwarded_for', `geocoded_data` mapstring, string COMMENT 'Geocoded map with continent, country_code, country, city, subdivision, postal_code, latitude, longitude, timezone keys and associated values.', `x_cache` string COMMENT 'X-Cache header of response', `user_agent_map`mapstring, string COMMENT 'User-agent map with browser_name, browser_major, device, os_name, os_minor, os_major keys and associated values', `x_analytics_map` mapstring, string COMMENT 'X_analytics map view of the x_analytics field', `ts`timestampCOMMENT 'Unix timestamp in milliseconds extracted from dt', `access_method` string COMMENT 'Method used to accessing the site (mobile app|mobile web|desktop)', `agent_type`string COMMENT 'Categorise the agent making the webrequest as either user or spider (automatas to be added).', `is_zero` boolean COMMENT 'Indicates if the webrequest is accessed through a zero provider', `referer_class` string COMMENT 'Indicates if a referer is internal, external or unknown.' ) PARTITIONED BY ( `webrequest_source` string COMMENT 'Source cluster', `year` int COMMENT 'Unpadded year of request', `month` int COMMENT 'Unpadded month of request', `day` int COMMENT 'Unpadded day of request', `hour` int COMMENT 'Unpadded hour of request' ) CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS STORED AS PARQUET LOCATION '/wmf/data/wmf/webrequest' ; hive (otto) alter table otto.webrequest_few_partitions_big_data add partition (webrequest_source='misc', year=2015, month=5, day=20, hour=0) location
RE: Storing spark processed output to Database asynchronously.
If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new
RE: Storing spark processed output to Database asynchronously.
… and measure 4 is to implement a custom Feedback Loop to e.g.to monitor the amount of free RAM and number of queued jobs and automatically decrease the message consumption rate of the Receiver until the number of clogged RDDs and Jobs subsides (again here you artificially decrease your performance in the name of the reliability/integrity of your system ie not loosing messages) From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 9:39 PM To: 'Tathagata Das'; 'Gautam Bajaj' Cc: 'user' Subject: RE: Storing spark processed output to Database asynchronously. If the message consumption rate is higher than the time required to process ALL data for a micro batch (ie the next RDD produced for your stream) the following happens – lets say that e.g. your micro batch time is 3 sec: 1. Based on your message streaming and consumption rate, you get e.g. a 500 MB RDD to be processed during the next 3 sec micro batch 2. However the work performed on the RDD by your streaming job takes more than 3 sec 3. In the meantime the next RDD comes in and occupies another 500MB and so on and so forth until bm the current iteration of the job crashes due to what is essentially a memory exhaustion (no more free ram for the next RDD) due to what is essentially a memory leak The above can be called a design a flaw because Spark Streaming seems to rely on the default behavior of Spark Batch which is to remove In Memory Only RDDs when there is no more free memory in the system, however in a batch context Spark Batch can always recreate a removed RDD from e.g. the file system, while in a streaming context the data is gone for ever You can check whether the above behavior is the reason for your lost messages by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to see whether your streaming app has any LOST JOBS and how many – each lost job is a lost RDD is a lost messages The above can be overcome by using one of the following measures: 1. Set the Receiver rate to a level which will allow your job to complete within the time for micro-batch (obviously you are limiting voluntarily your performance in this way) 2. Throw more boxes/cores/ram at the problem and also improve the performance of your tasks performing the work on the messages (e.g. review and refactor the code) 3. Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep using the RAM until there is free space and then switch to disk rather than crashing miserably and losing the affected job iteration and all its messages – obviously every time it has to resort to the disk your performance will get hit From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 22, 2015 8:55 PM To: Gautam Bajaj Cc: user Subject: Re: Storing spark processed output to Database asynchronously. Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com
spark.executor.extraClassPath - Values not picked up by executors
I'm using the spark-cassandra-connector from DataStax in a spark streaming job launched from my own driver. It is connecting a a standalone cluster on my local box which has two worker running. This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT. I have added the following entry to my $SPARK_HOME/conf/spark-default.conf: spark.executor.extraClassPath /projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes up just fine. As do the two workers with the following command: Worker 1, port 8081: radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://radtech.io:7077 --webui-port 8081 --cores 2 Worker 2, port 8082 radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://radtech.io:7077 --webui-port 8082 --cores 2 When I execute the Driver connecting the the master: sbt app/run -Dspark.master=spark://radtech.io:7077 It starts up, but when the executors are launched they do not include the entry in the spark.executor.extraClassPath: 15/05/22 17:35:26 INFO Worker: Asked to launch executor app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO ExecutorRunner: Launch command: java -cp /usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar -Dspark.driver.port=55932 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler --executor-id 0 --hostname 192.168.1.3 --cores 2 --app-id app-20150522173526- --worker-url akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker which will then cause the executor to fail with a ClassNotFoundException, which I would expect: [WARN] [2015-05-22 17:38:18,035] [org.apache.spark.scheduler.TaskSetManager]: Lost task 0.0 in stage 2.0 (TID 23, 192.168.1.3): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I also notice that some of the entires on the executor classpath are duplicated? This is a newly installed spark-1.3.1-bin-hadoop2.6 standalone cluster just to ensure I had nothing from testing in the way. I can set the SPARK_CLASSPATH in the $SPARK_HOME/spark-env.sh and it will pick up the jar and append it fine. Any suggestions on what is going on here? Seems to just ignore whatever I have in the spark.executor.extraClassPath. Is there a different way to do this? TIA. -Todd
Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
Great to see the result comparable to R in new ML implementation. Since majority of users will still use the old mllib api, we plan to call the ML implementation from MLlib to handle the intercept correctly with regularization. JIRA is created. https://issues.apache.org/jira/browse/SPARK-7780 Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 10:45 AM, Xin Liu liuxin...@gmail.com wrote: Thank you guys for the prompt help. I ended up building spark master and verified what DB has suggested. val lr = (new MlLogisticRegression) .setFitIntercept(true) .setMaxIter(35) val model = lr.fit(sqlContext.createDataFrame(training)) val scoreAndLabels = model.transform(sqlContext.createDataFrame(test)) .select(probability, label) .map { case Row(probability: Vector, label: Double) = (probability(1), label) } Without doing much tuning, above generates Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914] Intercept: -3.3076806966913006 Area under ROC: 0.7033511043412033 I also tried it on a much bigger dataset I have and its results are close to what I get from statsmodel. Now early waiting for the 1.4 release. Thanks, Xin On Wed, May 20, 2015 at 9:37 PM, Chris Gore cdg...@cdgore.com wrote: I tried running this data set as described with my own implementation of L2 regularized logistic regression using LBFGS to compare: https://github.com/cdgore/fitbox Intercept: -0.886745823033 Weights (['gre', 'gpa', 'rank']):[ 0.28862268 0.19402388 -0.36637964] Area under ROC: 0.724056603774 The difference could be from the feature preprocessing as mentioned. I normalized the features around 0: binary_train_normalized = (binary_train - binary_train.mean()) / binary_train.std() binary_test_normalized = (binary_test - binary_train.mean()) / binary_train.std() On a data set this small, the difference in models could also be the result of how the training/test sets were split. Have you tried running k-folds cross validation on a larger data set? Chris On May 20, 2015, at 6:15 PM, DB Tsai d...@netflix.com.INVALID wrote: Hi Xin, If you take a look at the model you trained, the intercept from Spark is significantly smaller than StatsModel, and the intercept represents a prior on categories in LOR which causes the low accuracy in Spark implementation. In LogisticRegressionWithLBFGS, the intercept is regularized due to the implementation of Updater, and the intercept should not be regularized. In the new pipleline APIs, a LOR with elasticNet is implemented, and the intercept is properly handled. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala As you can see the tests, https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala the result is exactly the same as R now. BTW, in both version, the feature scalings are done before training, and we train the model in scaled space but transform the model weights back to original space. The only difference is in the mllib version, LogisticRegressionWithLBFGS regularizes the intercept while in the ml version, the intercept is excluded from regularization. As a result, if lambda is zero, the model should be the same. On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote: Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of
Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
In Spark 1.4, Logistic Regression with elasticNet is implemented in ML pipeline framework. Model selection can be achieved through high lambda resulting lots of zero in the coefficients. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 1:19 AM, SparknewUser melanie.galloi...@gmail.com wrote: I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark on Windows 2008 failed to save RDD to windows shared folder
I used spark standalone cluster on Windows 2008. I kept on getting the following error when trying to save an RDD to a windows shared folder rdd.saveAsObjectFile(file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj) 15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 12) java.io.IOException: Mkdirs failed to create file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1071) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The T: drive is mapped to a windows shared folder, e.g. T: - \\10.196.119.230\myshare The id running spark does have write permission to this folder. It works most of the time but failed sometime. Can anybody tell me what is the problem here? Please advise. Thanks.
Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
Hi All, I have cluster of four nodes (three workers and one master, with one core each) which consumes data from Kinesis at 15 second intervals using two streams (i.e. receivers). The job simply grabs the latest batch and pushes it to MongoDB. I believe that the problem is that all tasks are executed on a single worker node and never distributed to the others. This is true even after I set the number of concurrentJobs to 3. Overall, I would really like to increase throughput (i.e. more than 500 records / second) and understand why all executors are not being utilized. Here are some parameters I have set: - - spark.streaming.blockInterval 200 - spark.locality.wait 500 - spark.streaming.concurrentJobs 3 This is the code that's actually doing the writing: def write(rdd: RDD[Data], time:Time) : Unit = { val result = doSomething(rdd, time) result.foreachPartition { i = i.foreach(record = connection.insert(record)) } } def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { rdd.flatMap(MyObject) } Any ideas as to how to improve the throughput? Thanks, Mike.
Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
I guess each receiver occupies a executor. So there was only one executor available for processing the job. On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have cluster of four nodes (three workers and one master, with one core each) which consumes data from Kinesis at 15 second intervals using two streams (i.e. receivers). The job simply grabs the latest batch and pushes it to MongoDB. I believe that the problem is that all tasks are executed on a single worker node and never distributed to the others. This is true even after I set the number of concurrentJobs to 3. Overall, I would really like to increase throughput (i.e. more than 500 records / second) and understand why all executors are not being utilized. Here are some parameters I have set: - - spark.streaming.blockInterval 200 - spark.locality.wait 500 - spark.streaming.concurrentJobs 3 This is the code that's actually doing the writing: def write(rdd: RDD[Data], time:Time) : Unit = { val result = doSomething(rdd, time) result.foreachPartition { i = i.foreach(record = connection.insert(record)) } } def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { rdd.flatMap(MyObject) } Any ideas as to how to improve the throughput? Thanks, Mike.
Re: spark on Windows 2008 failed to save RDD to windows shared folder
The stack trace is related to hdfs. Can you tell us which hadoop release you are using ? Is this a secure cluster ? Thanks On Fri, May 22, 2015 at 1:55 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I used spark standalone cluster on Windows 2008. I kept on getting the following error when trying to save an RDD to a windows shared folder rdd.saveAsObjectFile(“file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj”) 15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 12) java.io.IOException: Mkdirs failed to create file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1071) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270) at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527) at org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The T: drive is mapped to a windows shared folder, e.g. T: - \\10.196.119.230\myshare The id running spark does have write permission to this folder. It works most of the time but failed sometime. Can anybody tell me what is the problem here? Please advise. Thanks.
SparkSQL failing while writing into S3 for 'insert into table'
Hello, I am using spark 1.3 Hive 0.13.1 in AWS. From Spark-SQL, when running Hive query to export Hive query result into AWS S3, it failed with the following message: == org.apache.hadoop.hive.ql.metadata.HiveException: checkPaths: s3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1 has nested directorys3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1/_temporary at org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157) at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298) at org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:230) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:124) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:249) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088) == The query tested is spark-sqlcreate external table s3_dwserver_sql_t1 (q string) location 's3://test-dev/s3_dwserver_sql_t1') spark-sqlinsert into table s3_dwserver_sql_t1 select q from api_search where pdate='2015-05-12' limit 100; == It seems it generated query results into tmp dir firstly, and tries to rename it into the right folder finally. But, it failed while renaming it. I appreciate any advice. Thanks, Okehee -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-failing-while-writing-into-S3-for-insert-into-table-tp23000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dynamic Allocation with Spark Streaming
Hi, 1. Dynamic allocation is currently only supported with YARN, correct? 2. In spark streaming, it is possible to change the number of executors while an application is running? If so, can the allocation be controlled by the application, instead of using any already defined automatic policy? That is, I want to be able to get more executors or decommission executors on demand. Is there some way to achieve this? Thanks.
Re: Dynamic Allocation with Spark Streaming
Sorry, but I can't see on TD's comments how to allocate executors on demand. It seems to me that he's talking about resources within an executor, mapping shards to cores. I want to be able to decommission executors/workers/machines. On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote: For #1, the answer is yes. For #2, See TD's comments on SPARK-7661 Cheers On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, 1. Dynamic allocation is currently only supported with YARN, correct? 2. In spark streaming, it is possible to change the number of executors while an application is running? If so, can the allocation be controlled by the application, instead of using any already defined automatic policy? That is, I want to be able to get more executors or decommission executors on demand. Is there some way to achieve this? Thanks.
Re: Dynamic Allocation with Spark Streaming
Or should I shutdown the streaming context gracefully and then start it again with a different number of executors? On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry, but I can't see on TD's comments how to allocate executors on demand. It seems to me that he's talking about resources within an executor, mapping shards to cores. I want to be able to decommission executors/workers/machines. On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote: For #1, the answer is yes. For #2, See TD's comments on SPARK-7661 Cheers On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, 1. Dynamic allocation is currently only supported with YARN, correct? 2. In spark streaming, it is possible to change the number of executors while an application is running? If so, can the allocation be controlled by the application, instead of using any already defined automatic policy? That is, I want to be able to get more executors or decommission executors on demand. Is there some way to achieve this? Thanks.
Re: Dynamic Allocation with Spark Streaming
For #1, the answer is yes. For #2, See TD's comments on SPARK-7661 Cheers On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, 1. Dynamic allocation is currently only supported with YARN, correct? 2. In spark streaming, it is possible to change the number of executors while an application is running? If so, can the allocation be controlled by the application, instead of using any already defined automatic policy? That is, I want to be able to get more executors or decommission executors on demand. Is there some way to achieve this? Thanks.
Re: spark.executor.extraClassPath - Values not picked up by executors
Todd, I don't have any answers for you...other than the file is actually named spark-defaults.conf (not sure if you made a typo in the email or misnamed the file...). Do any other options from that file get read? I also wanted to ask if you built the spark-cassandra-connector-assembly-1.3 .0-SNAPSHOT.jar from trunk or if they published a 1.3 drop somewhere -- I'm just starting out with Cassandra and discovered https://datastax-oss.atlassian.net/browse/SPARKC-98 is still open... On Fri, May 22, 2015 at 6:15 PM, Todd Nist tsind...@gmail.com wrote: I'm using the spark-cassandra-connector from DataStax in a spark streaming job launched from my own driver. It is connecting a a standalone cluster on my local box which has two worker running. This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT. I have added the following entry to my $SPARK_HOME/conf/spark-default.conf: spark.executor.extraClassPath /projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes up just fine. As do the two workers with the following command: Worker 1, port 8081: radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://radtech.io:7077 --webui-port 8081 --cores 2 Worker 2, port 8082 radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://radtech.io:7077 --webui-port 8082 --cores 2 When I execute the Driver connecting the the master: sbt app/run -Dspark.master=spark://radtech.io:7077 It starts up, but when the executors are launched they do not include the entry in the spark.executor.extraClassPath: 15/05/22 17:35:26 INFO Worker: Asked to launch executor app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO ExecutorRunner: Launch command: java -cp /usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar -Dspark.driver.port=55932 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler --executor-id 0 --hostname 192.168.1.3 --cores 2 --app-id app-20150522173526- --worker-url akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker which will then cause the executor to fail with a ClassNotFoundException, which I would expect: [WARN] [2015-05-22 17:38:18,035] [org.apache.spark.scheduler.TaskSetManager]: Lost task 0.0 in stage 2.0 (TID 23, 192.168.1.3): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I also notice that some of the entires on the executor classpath are duplicated? This is a newly installed spark-1.3.1-bin-hadoop2.6 standalone
SparkSQL query plan to Stage wise breakdown
Hi, Is there an easy way to see how a SparkSQL query plan maps to different stages of the generated Spark job? The WebUI is entirely in terms of RDD stages and I'm having a hard time mapping it back to my query. Pramod
Re: Dynamic Allocation with Spark Streaming
That should do. Cheers On Fri, May 22, 2015 at 8:28 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Or should I shutdown the streaming context gracefully and then start it again with a different number of executors? On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Sorry, but I can't see on TD's comments how to allocate executors on demand. It seems to me that he's talking about resources within an executor, mapping shards to cores. I want to be able to decommission executors/workers/machines. On Sat, May 23, 2015 at 3:31 AM, Ted Yu yuzhih...@gmail.com wrote: For #1, the answer is yes. For #2, See TD's comments on SPARK-7661 Cheers On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, 1. Dynamic allocation is currently only supported with YARN, correct? 2. In spark streaming, it is possible to change the number of executors while an application is running? If so, can the allocation be controlled by the application, instead of using any already defined automatic policy? That is, I want to be able to get more executors or decommission executors on demand. Is there some way to achieve this? Thanks.
Re: Performance degradation between spark 0.9.3 and 1.3.1
may because of snappy-java, https://issues.apache.org/jira/browse/SPARK-5081 On May 23, 2015, at 1:23 AM, Josh Rosen rosenvi...@gmail.com wrote: I don't think that 0.9.3 has been released, so I'm assuming that you're running on branch-0.9. There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that this question doesn't have a concise answer: https://github.com/apache/spark/compare/branch-0.9...v1.3.1 To narrow down the potential causes, have you tried comparing 0.9.3 to, say, 1.0.2 or branch-1.0, or some other version that's closer to 0.9? On Fri, May 22, 2015 at 9:43 AM, Shay Seng s...@urbanengines.com wrote: Hi. I have a job that takes ~50min with Spark 0.9.3 and ~1.8hrs on Spark 1.3.1 on the same cluster. The only code difference between the two code bases is to fix the Seq - Iter changes that happened in the Spark 1.x series. Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that would cause such a large degradation in performance? Changes in partitioning algorithms, scheduling etc? shay
Re: Bigints in pyspark
Could you show up the schema and confirm that they are LongType? df.printSchema() On Mon, Apr 27, 2015 at 5:44 AM, jamborta jambo...@gmail.com wrote: hi all, I have just come across a problem where I have a table that has a few bigint columns, it seems if I read that table into a dataframe then collect it in pyspark, the bigints are stored and integers in python. (The problem is if I write it back to another table, I detect the hive type programmatically from the python type, so it turns those columns to integers) Is that intended this way or a bug? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bigints-in-pyspark-tp22668.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Storing spark processed output to Database asynchronously.
Something does not make sense. Receivers (currently) does not get blocked (unless rate limit has been set) due to processing load. The receiver will continue to receive data and store it in memory and until it is processed. So I am still not sure how the data loss is happening. Unless you are sending data at a faster rate than the receiver can handle (that more than the max rate the receiver can save data in memory and replicate to other nodes). In general, if you are particular about data loss, then UDP is not really a good choice in the first place. If you can try using TCP, try it. It would at least eliminate the possibility that I mentioned above. Ultimately if you try sending data faster that the receiver can handle (independent of whether processing can handle), then you will loose data if you are using UDP. You have to use TCP to naturally control the sending rate to match the receiving rate in the receiver, without dropping data. On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote: This is just a friendly ping, just to remind you of my query. Also, is there a possible explanation/example on the usage of AsyncRDDActions in Java ? On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote: I am received data at UDP port 8060 and doing processing on it using Spark and storing the output in Neo4j. But the data I'm receiving and the data that is getting stored doesn't match probably because Neo4j API takes too long to push the data into database. Meanwhile, Spark is unable to receive data probably because the process is blocked. On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote: Can you elaborate on how the data loss is occurring? On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote: That is completely alright, as the system will make sure the works get done. My major concern is, the data drop. Will using async stop data loss? On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote: If you cannot push data as fast as you are generating it, then async isnt going to help either. The work is just going to keep piling up as many many async jobs even though your batch processing times will be low as that processing time is not going to reflect how much of overall work is pending in the system. On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote: Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; }
Re: DataFrame groupBy vs RDD groupBy
DataFrames have a lot more information about the data, so there is a whole class of optimizations that are possible there that we cannot do in RDDs. This is why we are focusing a lot of effort on this part of the project. In Spark 1.4 you can accomplish what you want using the new window function feature. This can be done with SQL as you described or directly on a DataFrame: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y) df.select('x, 'y, rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show +-+-+--+ |x|y|number| +-+-+--+ |a|1| 1| |b|1| 2| |c|2| 1| |d|2| 2| +-+-+--+ On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com wrote: Hello everybody, I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part of my code using groupBy became really slow. *1/ *Why does the groupBy of rdd is really slow in comparison to the groupBy of dataFrame? // DataFrame : running in few seconds val result = table.groupBy(col1).count // RDD : taking hours with a lot of /spilling in-memory/ val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.map(l = (l._1,l._2.size) ).count() *2/* My goal is to groupBy on a key, then to order each group over a column and finally to add the row number in each group. I had this code running before changing to Spark 1.3 and it worked fine, but since I have changed to DataFrame it is really slow. val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.flatMap { l = l._2.toList .sortBy { u = val rs = RowSchema(u, schemaOriginel) val col1 = rs.getValueByName(col1) val col2 = rs.getValueByName(col2) (col1, col2) } .zipWithIndex } /I think the SQL equivalent of what I try to do : / SELECT a, ROW_NUMBER() OVER (PARTITION BY a) AS num FROM table. I don't think I can do this with a GroupedData (result of df.groupby). Any ideas on how I can speed up this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?
If you want to select specific variable combinations by hand, then you will need to modify the dataset before passing it to the ML algorithm. The DataFrame API should make that easy to do. If you want to have an ML algorithm select variables automatically, then I would recommend using L1 regularization for now and possibly elastic net after 1.4 is release, per DB's suggestion. If you want detailed model statistics similar to what R provides, I've created a JIRA for discussing how we should add that functionality to MLlib. Those types of stats will be added incrementally, but feedback would be great for prioritization: https://issues.apache.org/jira/browse/SPARK-7674 To answer your question: How are the weights calculated: is there a correlation calculation with the variable of interest? -- Weights are calculated as with all logistic regression algorithms, by using convex optimization to minimize a regularized log loss. Good luck! Joseph On Fri, May 22, 2015 at 1:07 PM, DB Tsai dbt...@dbtsai.com wrote: In Spark 1.4, Logistic Regression with elasticNet is implemented in ML pipeline framework. Model selection can be achieved through high lambda resulting lots of zero in the coefficients. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Fri, May 22, 2015 at 1:19 AM, SparknewUser melanie.galloi...@gmail.com wrote: I am new in MLlib and in Spark.(I use Scala) I'm trying to understand how LogisticRegressionWithLBFGS and LogisticRegressionWithSGD work. I usually use R to do logistic regressions but now I do it on Spark to be able to analyze Big Data. The model only returns weights and intercept. My problem is that I have no information about which variable is significant and which variable I had better to delete to improve my model. I only have the confusion matrix and the AUC to evaluate the performance. Is there any way to have information about the variables I put in my model? How can I try different variable combinations, do I have to modify the dataset of origin (e.g. delete one or several columns?) How are the weights calculated: is there a correlation calculation with the variable of interest? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Application on standalone cluster never changes state to be stopped
Hi, Environment: Spark standalone cluster running with a master and a work on a small Vagrant VM. The Jetty Webapp on the same node calls the spark-submit script to start the job. From the contents of the stdout I can see that it's running successfully. However, the spark-submit process never seems to complete (after 2 minutes) and the state in the Web UI remains RUNNING. The Application main calls SparkContext.stop and exits with zero. What are the criteria for when an Application is considered finished? Thanks in advance! Edward
Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)
A receiver occupies a cpu core, an executor is simply a jvm instance and as such it can be granted any number of cores and ram So check how many cores you have per executor Sent from Samsung Mobile div Original message /divdivFrom: Mike Trienis mike.trie...@orcsol.com /divdivDate:2015/05/22 21:51 (GMT+00:00) /divdivTo: user@spark.apache.org /divdivSubject: Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb) /divdiv /divI guess each receiver occupies a executor. So there was only one executor available for processing the job. On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I have cluster of four nodes (three workers and one master, with one core each) which consumes data from Kinesis at 15 second intervals using two streams (i.e. receivers). The job simply grabs the latest batch and pushes it to MongoDB. I believe that the problem is that all tasks are executed on a single worker node and never distributed to the others. This is true even after I set the number of concurrentJobs to 3. Overall, I would really like to increase throughput (i.e. more than 500 records / second) and understand why all executors are not being utilized. Here are some parameters I have set: spark.streaming.blockInterval 200 spark.locality.wait 500 spark.streaming.concurrentJobs 3 This is the code that's actually doing the writing: def write(rdd: RDD[Data], time:Time) : Unit = { val result = doSomething(rdd, time) result.foreachPartition { i = i.foreach(record = connection.insert(record)) } } def doSomething(rdd: RDD[Data]) : RDD[MyObject] = { rdd.flatMap(MyObject) } Any ideas as to how to improve the throughput? Thanks, Mike.
Re: FetchFailedException and MetadataFetchFailedException
on the worker/container that fails, the file not found is the first error -- the output below is from the yarn log. There were some python worker crashes for another job/stage earlier (see the warning at 18:36) but I expect those to be unrelated to this file not found error. == LogType:stderr Log Upload Time:15-May-2015 18:50:05 LogLength:5706 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4 j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting to kill Python Worker 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0 (TID 995) java.io.FileNotFoundException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3 -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:212) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local spark dir: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489 java.io.IOException: Failed to delete: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:933) at org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:165) at org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:162) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.org $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:162) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:156) at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1208) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:88) at org.apache.spark.executor.Executor.stop(Executor.scala:146) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:105) at
RE: Spark Streaming and Drools
OR you can run Drools in a Central Server Mode ie as a common/shared service, but that would slowdown your Spark Streaming job due to the remote network call which will have to be generated for every single message From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:22 AM To: 'Evo Eftimov'; 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools The only “tricky” bit would be when you want to manage/update the Rule Base in your Drools Engines already running as Singletons in Executor JVMs on Worker Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already loaded in Drools is not a problem. From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:20 AM To: 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
RE: Spark Streaming and Drools
I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Trying to connect to many topics with several DirectConnect
Hi, I'm trying to connect to two topics of Kafka with Spark with DirectStream but I get an error. I don't know if there're any limitation to do it, because when I just access to one topics everything if right. *val ssc = new StreamingContext(sparkConf, Seconds(5))* *val kafkaParams = Map[String, String](metadata.broker.list - quickstart.cloudera:9092)* *val setTopic1 = Set(topic1)* *val setTopic2 = Set(topic2)* *val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* *val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* The error that I get is: * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* *15/05/22 13:12:40 ERROR OneForOneStrategy: * *java.lang.NullPointerException* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* Are there any limitation to do it?
RE: Spark Streaming and Drools
You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: Spark Streaming and Drools
Hi, Sometime back I played with Distributed Rule processing by integrating Drool with HBase Co-Processors ..and invoke Rules on any incoming data .. https://github.com/dibbhatt/hbase-rule-engine You can get some idea how to use Drools rules if you see this RegionObserverCoprocessor .. https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java Idea is basically to create a stateless Ruleengine from the drl file and fire the rule on incoming data .. Even though the code is for invoking rules on HBase PUT object , but you can get an idea ..and modify it for Spark.. Dibyendu On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 11:07 AM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 9:43 AM *To:* user@spark.apache.org *Subject:* Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: 回复: 回复: How to use spark to access HBase with Security enabled
Can you share the exception(s) you encountered ? Thanks On May 22, 2015, at 12:33 AM, donhoff_h 165612...@qq.com wrote: Hi, My modified code is listed below, just add the SecurityUtil API. I don't know which propertyKeys I should use, so I make 2 my own propertyKeys to find the keytab and principal. object TestHBaseRead2 { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create() hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab) hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb) SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal) val conn = ConnectionFactory.createConnection(hbConf) val tbl = conn.getTable(TableName.valueOf(spark_t01)) try { val get = new Get(Bytes.toBytes(row01)) val res = tbl.get(get) println(result:+res.toString) } finally { tbl.close() conn.close() es.shutdown() } val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) val v = rdd.sum() println(Value=+v) sc.stop() } } -- 原始邮件 -- 发件人: yuzhihong;yuzhih...@gmail.com; 发送时间: 2015年5月22日(星期五) 下午3:25 收件人: donhoff_h165612...@qq.com; 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 主题: Re: 回复: How to use spark to access HBase with Security enabled Can you post the morning modified code ? Thanks On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote: Hi, Thanks very much for the reply. I have tried the SecurityUtil. I can see from log that this statement executed successfully, but I still can not pass the authentication of HBase. And with more experiments, I found a new interesting senario. If I run the program with yarn-client mode, the driver can pass the authentication, but the executors can not. If I run the program with yarn-cluster mode, both the driver and the executors can not pass the authentication. Can anybody give me some clue with this info? Many Thanks! -- 原始邮件 -- 发件人: yuzhihong;yuzhih...@gmail.com; 发送时间: 2015年5月22日(星期五) 凌晨5:29 收件人: donhoff_h165612...@qq.com; 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 主题: Re: How to use spark to access HBase with Security enabled Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to bgdt02.dev.hrb/130.1.9.98:2181, initiating session 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 21 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires: Fri May 22 16:03:18 CST 2015 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 11:43:32 CST 2015 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, negotiated timeout = 4 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called multiple times. Overwriting connection
Re: Issues with constants in Spark HiveQL queries
Hi I was using the wrong version of the spark-hive jar. I downloaded the right version of the jar from the cloudera repo and it works now. Thanks, Skanda On Fri, May 22, 2015 at 2:36 PM, Skanda skanda.ganapa...@gmail.com wrote: Hi All, I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any luck solving the issue? Exception: Exception in thread main org.apache.spark.sql.AnalysisException: Unsupported language features in query: select * from everest_marts_test.hive_ql_test where daily_partition=20150101 TOK_QUERY 1, 0,18, 14 TOK_FROM 1, 4,8, 14 TOK_TABREF 1, 6,8, 14 TOK_TABNAME 1, 6,8, 14 everest_marts_test 1, 6,6, 14 hive_ql_test 1, 8,8, 33 TOK_INSERT 0, -1,18, 0 TOK_DESTINATION 0, -1,-1, 0 TOK_DIR 0, -1,-1, 0 TOK_TMP_FILE 0, -1,-1, 0 TOK_SELECT 0, 0,2, 0 TOK_SELEXPR 0, 2,2, 0 TOK_ALLCOLREF 0, 2,2, 0 TOK_WHERE 1, 10,18, 68 TOK_FUNCTION 1, 12,18, 68 in 1, 14,14, 68 TOK_TABLE_OR_COL 1, 12,12, 52 daily_partition 1, 12,12, 52 20150101 1, 16,18, 72 scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20150101 : 20150101 1, 16,18, 72 + org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261) ; at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96) at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at
RE: Spark Streaming and Drools
The only “tricky” bit would be when you want to manage/update the Rule Base in your Drools Engines already running as Singletons in Executor JVMs on Worker Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already loaded in Drools is not a problem. From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Friday, May 22, 2015 11:20 AM To: 'Antonio Giambanco' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and Drools I am not aware of existing examples but you can always “ask” Google Basically from Spark Streaming perspective, Drools is a third-party Software Library, you would invoke it in the same way as any other third-party software library from the Tasks (maps, filters etc) within your DAG job From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 11:07 AM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Spark Streaming and Drools Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message From: Antonio Giambanco [mailto:antogia...@gmail.com] Sent: Friday, May 22, 2015 9:43 AM To: user@spark.apache.org Subject: Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: Partitioning of Dataframes
This is added to 1.4.0 https://github.com/apache/spark/pull/5762 On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote: Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parallel parameter tuning: distributed execution of MLlib algorithms
Hi, I am currently experimenting with linear regression (SGD) (Spark + MLlib, ver. 1.2). At this point in time I need to fine-tune the hyper-parameters. I do this (for now) by an exhaustive grid search of the step size and the number of iterations. Currently I am on a dual core that acts as a master (local mode for now but will be adding spark worker later). In order to maximize throughput I need to execute each execution of the linear regression algorithm in parallel. According to the documentation it seems like parallel jobs may be scheduled if they are executed in separate threads [1]. So this brings me to my first question: does this mean I am CPU bound by the Spark master? In other words the maximum number of jobs = maximum number of threads of the OS? I searched the mailing list but did not find anything regarding MLlib itself. I even peaked into the new MLlib API that uses pipelines and has support for parameter tuning. However, it looks like each job (instance of the learning algorithm) is executed in sequence. Can anyone confirm this? This brings me to my 2ndo question: is their any example that shows how one can execute MLlib algorithms as parallel jobs? Finally, is their any general technique I can use to execute an algorithm in a distributed manner using Spark? More specifically I would like to have several MLlib algorithms run in parallel. Can anyone show me an example of sorts to do this? TIA. Hugo F. [1] https://spark.apache.org/docs/1.2.0/job-scheduling.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Partitioning of Dataframes
Alright, that doesn't seem to have made it into the Python API yet. On 2015-05-22 15:12, Silvio Fiorito wrote: This is added to 1.4.0 https://github.com/apache/spark/pull/5762 On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote: Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Partitioning of Dataframes
Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LDA prediction on new document
Dani, Folding in I believe refers to setting up your Gibbs sampler (or other model) with the learning word and document topic proportions as computed by spark. You might look at https://lists.cs.princeton.edu/pipermail/topic-models/2014-May/002763.html Where Jones suggests summing across columns of the term matrix for each of the doc terms to get the topic proportions. I have not worked with spark lda but if you can pull the theta and phi matrixes out of the spark model, you should be able to start with the approximation as inference. Have you tried vowpal wabbit or gensim? Cheers On Friday, May 22, 2015, Dani Qiu zongmin@gmail.com javascript:_e(%7B%7D,'cvml','zongmin@gmail.com'); wrote: thanks, Ken but I am planning to use spark LDA in production. I cannot wait for the future release. At least, provide some workaround solution. PS : in SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567 , mentioned This will require inference but should be able to use the same code, with a few modification to keep the inferred topics fixed. Can somebody elaborate it more ? folding-in in EM ? or Can I simply summing the topic distribution of the terms in the new document ? On Fri, May 22, 2015 at 2:23 PM, Ken Geis geis@gmail.com wrote: Dani, this appears to be addressed in SPARK-5567 https://issues.apache.org/jira/browse/SPARK-5567, scheduled for Spark 1.5.0. Ken On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote: *From: *Dani Qiu zongmin@gmail.com *Subject: **LDA prediction on new document* *Date: *May 21, 2015 at 8:48:40 PM PDT *To: *user@spark.apache.org Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0 mllib provide EM based LDA implementation. It returns both topics and topic distribution. My question is how can I use these parameters to predict on new document ? And I notice there is an Online LDA implementation in spark master branch, it only returns topics , how can I use this to do prediction on new document (and trained document) ? thanks -- - Charles
Re: Trying to connect to many topics with several DirectConnect
I just verified that the following code works on 1.3.0 : val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic1) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic2) stream1.print() stream2.print() So something else is probably going on in your case. See if simply printing the two streams works for you, then compare whats different in your actual job. On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm trying to connect to two topics of Kafka with Spark with DirectStream but I get an error. I don't know if there're any limitation to do it, because when I just access to one topics everything if right. *val ssc = new StreamingContext(sparkConf, Seconds(5))* *val kafkaParams = Map[String, String](metadata.broker.list - quickstart.cloudera:9092)* *val setTopic1 = Set(topic1)* *val setTopic2 = Set(topic2)* *val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* *val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* The error that I get is: * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* *15/05/22 13:12:40 ERROR OneForOneStrategy: * *java.lang.NullPointerException* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* Are there any limitation to do it?
partitioning after extracting from a hive table?
I have a table in a Hive database partitioning by date. I notice that when I query this table using HiveContext the created data frame has an specific number of partitions. Do this partitioning corresponds to my original table partitioning in Hive? Thanks -- Cesar Flores
Re: How to use spark to access HBase with Security enabled
You might also enable debug in: hadoop-env.sh # Extra Java runtime options. Empty by default. export HADOOP_OPTS=$HADOOP_OPTS -Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true ${HADOOP_OPTS}” and check that the principals are the same on the NameNode and DataNode. and you can confirm the same on all nodes in hdfs-site.xml. You can also ensure all nodes in the cluster are kerberized in core-site.xml (no auth by default) : property namehadoop.security.authentication/name valuekerberos/value descriptionSet the authentication for the cluster. Valid values are: simple or kerberos. /description /property https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html Best Regards Frank On May 22, 2015, at 4:25 AM, Ted Yu yuzhih...@gmail.com wrote: Can you share the exception(s) you encountered ? Thanks On May 22, 2015, at 12:33 AM, donhoff_h 165612...@qq.com wrote: Hi, My modified code is listed below, just add the SecurityUtil API. I don't know which propertyKeys I should use, so I make 2 my own propertyKeys to find the keytab and principal. object TestHBaseRead2 { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create() hbConf.set(dhao.keytab.file,//etc//spark//keytab//spark.user.keytab) hbConf.set(dhao.user.principal,sp...@bgdt.dev.hrb) SecurityUtil.login(hbConf,dhao.keytab.file,dhao.user.principal) val conn = ConnectionFactory.createConnection(hbConf) val tbl = conn.getTable(TableName.valueOf(spark_t01)) try { val get = new Get(Bytes.toBytes(row01)) val res = tbl.get(get) println(result:+res.toString) } finally { tbl.close() conn.close() es.shutdown() } val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) val v = rdd.sum() println(Value=+v) sc.stop() } } -- 原始邮件 -- 发件人: yuzhihong;yuzhih...@gmail.com; 发送时间: 2015年5月22日(星期五) 下午3:25 收件人: donhoff_h165612...@qq.com; 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 主题: Re: 回复: How to use spark to access HBase with Security enabled Can you post the morning modified code ? Thanks On May 21, 2015, at 11:11 PM, donhoff_h 165612...@qq.com wrote: Hi, Thanks very much for the reply. I have tried the SecurityUtil. I can see from log that this statement executed successfully, but I still can not pass the authentication of HBase. And with more experiments, I found a new interesting senario. If I run the program with yarn-client mode, the driver can pass the authentication, but the executors can not. If I run the program with yarn-cluster mode, both the driver and the executors can not pass the authentication. Can anybody give me some clue with this info? Many Thanks! -- 原始邮件 -- 发件人: yuzhihong;yuzhih...@gmail.com; 发送时间: 2015年5月22日(星期五) 凌晨5:29 收件人: donhoff_h165612...@qq.com; 抄送: Bill Qbill.q@gmail.com; useruser@spark.apache.org; 主题: Re: How to use spark to access HBase with Security enabled Are the worker nodes colocated with HBase region servers ? Were you running as hbase super user ? You may need to login, using code similar to the following: if (isSecurityEnabled()) { SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost); } SecurityUtil is hadoop class. Cheers On Thu, May 21, 2015 at 1:58 AM, donhoff_h 165612...@qq.com wrote: Hi, Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. According to your advice I have changed the configuration. Now my program can read the hbase-site.xml correctly. And it can also authenticate with zookeeper successfully. But I meet a new problem that is my program still can not pass the authentication of HBase. Did you or anybody else ever meet such kind of situation ? I used a keytab file to provide the principal. Since it can pass the authentication of the Zookeeper, I am sure the keytab file is OK. But it jsut can not pass the authentication of HBase. The exception is listed below and could you or anybody else help me ? Still many many thanks! Exception*** 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 sessionTimeout=9 watcher=hconnection-0x4e142a710x0, quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, baseZNode=/hbase 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in. 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started. 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as SASL mechanism. 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket
Re: Partitioning of Dataframes
Looking at python/pyspark/sql/dataframe.py : @since(1.4) def coalesce(self, numPartitions): @since(1.3) def repartition(self, numPartitions): Would the above methods serve the purpose ? Cheers On Fri, May 22, 2015 at 6:57 AM, Karlson ksonsp...@siberie.de wrote: Alright, that doesn't seem to have made it into the Python API yet. On 2015-05-22 15:12, Silvio Fiorito wrote: This is added to 1.4.0 https://github.com/apache/spark/pull/5762 On 5/22/15, 8:48 AM, Karlson ksonsp...@siberie.de wrote: Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, Karlson ksonsp...@siberie.de wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Performance degradation between spark 0.9.3 and 1.3.1
Hi. I have a job that takes ~50min with Spark 0.9.3 and ~1.8hrs on Spark 1.3.1 on the same cluster. The only code difference between the two code bases is to fix the Seq - Iter changes that happened in the Spark 1.x series. Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that would cause such a large degradation in performance? Changes in partitioning algorithms, scheduling etc? shay
Help reading Spark UI tea leaves..
Hi. I have an RDD that I use repeatedly through many iterations of an algorithm. To prevent recomputation, I persist the RDD (and incidentally I also persist and checkpoint it's parents) val consCostConstraintMap = consCost.join(constraintMap).map { case (cid, (costs,(mid1,_,mid2,_,_))) = { (cid, (costs, mid1, mid2)) } } consCostConstraintMap.setName(consCostConstraintMap) consCostConstraintMap.persist(MEMORY_AND_DISK_SER) ... later on in an iterative loop val update = updatedTrips.join(consCostConstraintMap).flatMap { ... }.treeReduce() - I can see from the UI that consCostConstraintMap is in storage RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in TachyonSize on Disk consCostConstraintMap http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:4040/storage/rdd?id=113Memory Serialized 1x Replicated600100%15.2 GB0.0 B0.0 B - In the Jobs list, I see the following pattern Where each of the treeReduce line corresponds to one iteration loop Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total 13treeReduce at reconstruct.scala:243 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=132015/05/22 16:27:112.9 min16/16 (194 skipped) 9024/9024 (109225 skipped) 12treeReduce at reconstruct.scala:243 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=122015/05/22 16:24:162.9 min16/16 (148 skipped) 9024/9024 (82725 skipped) 11treeReduce at reconstruct.scala:243 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=112015/05/22 16:21:212.9 min16/16 (103 skipped) 9024/9024 (56280 skipped) 10treeReduce at reconstruct.scala:243 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job?id=102015/05/22 16:18:282.9 min16/16 (69 skipped) 9024/9024 (36980 skipped) -- If I push into one Job I see *Completed Stages:* http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#completed 16 - *Skipped Stages:* http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/jobs/job/?id=12#skipped 148 Completed Stages (16) Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle ReadShuffle Write525treeReduce at reconstruct.scala:243 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=525attempt=0 +details 2015/05/22 16:27:0942 ms 24/24 21.7 KB524... 519map at reconstruct.scala:153 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519attempt=0 +details 2015/05/22 16:24:161.2 min 600/600 14.8 GB8.4 GBThe last line map at reconstruct.scala:153 http://ec2-54-151-185-196.ap-southeast-1.compute.amazonaws.com:8080/history/app-20150522160613-0001/stages/stage?id=519attempt=0 corresponds to val consCostConstraintMap = consCost.join(constraintMap).map { Which I expected to have been cached. Is there some way I can find out what it is spending 1.2 mins doing .. I presume reading and writing GB of data. But why? Eveything should be in memory? Any clues on where I should start? tks
Re: Spark Streaming and Drools
Thanks a lot Evo, do you know where I can find some examples? Have a great one A G 2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com: You can deploy and invoke Drools as a Singleton on every Spark Worker Node / Executor / Worker JVM You can invoke it from e.g. map, filter etc and use the result from the Rule to make decision how to transform/filter an event/message *From:* Antonio Giambanco [mailto:antogia...@gmail.com] *Sent:* Friday, May 22, 2015 9:43 AM *To:* user@spark.apache.org *Subject:* Spark Streaming and Drools Hi All, I'm deploying and architecture that uses flume for sending log information in a sink. Spark streaming read from this sink (pull strategy) e process al this information, during this process I would like to make some event processing. . . for example: Log appender writes information about all transactions in my trading platforms, if a platform user sells more than buy during a week I need to receive an alert on an event dashboard. How can I realize it? Is it possible with drools? Thanks so much
Re: Why is RDD to PairRDDFunctions only via implicits?
I'm not sure if it is possible to overload the map function twice, once for just KV pairs, and another for K and V separately. On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com wrote: This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the RDD API, but it could be even more discoverable if made available via the API directly. I assume this was originally an omission that now needs to be kept for backwards compatibility, but would any of the repo owners be open to making this more discoverable to the point of API docs and tab completion (while keeping both binary and source compatibility)? class PairRDD extends RDD{ pair methods } RDD{ def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V] } As long as the implicits remain, then compatibility remains, but now it is explicit in the docs on how to get a PairRDD and in tab completion. Thoughts? Justin Pihony
Re: Why is RDD to PairRDDFunctions only via implicits?
The (crude) proof of concept seems to work: class RDD[V](value: List[V]){ def doStuff = println(I'm doing stuff) } object RDD{ implicit def toPair[V](x:RDD[V]) = new PairRDD(List((1,2))) } class PairRDD[K,V](value: List[(K,V)]) extends RDD (value){ def doPairs = println(I'm using pairs) } class Context{ def parallelize[K,V](x: List[(K,V)]) = new PairRDD(x) def parallelize[V](x: List[V]) = new RDD(x) } On Fri, May 22, 2015 at 2:44 PM, Reynold Xin r...@databricks.com wrote: I'm not sure if it is possible to overload the map function twice, once for just KV pairs, and another for K and V separately. On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com wrote: This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the RDD API, but it could be even more discoverable if made available via the API directly. I assume this was originally an omission that now needs to be kept for backwards compatibility, but would any of the repo owners be open to making this more discoverable to the point of API docs and tab completion (while keeping both binary and source compatibility)? class PairRDD extends RDD{ pair methods } RDD{ def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V] } As long as the implicits remain, then compatibility remains, but now it is explicit in the docs on how to get a PairRDD and in tab completion. Thoughts? Justin Pihony
How to share a (spring) singleton service with Spark?
I have a small Spark launcher app able to instanciate a service via Spring xml application context and then broadcasts it in order to make it available on remote nodes. I suppose when a Spring service is instanciated, all dependencies are instanciated and injected at the same time, so broadcasting it should broadcast everything on remote nodes. My DAOs are not accessing a remote database, instead they use inner collections which are loaded at startup from xml files (in dao constructors). This works fine when I launch my app via spark-submit and local mode, but when I use yarn-cluster mode, I receive an exception (after all jobs have been launched) saying my inner collections inside DAOs are empty. All my objects are Serializable and my inner collections are mostly maps (HashMap). I've tried to declare collections as static but it has no effect on the broadcasting... Could someone tell me what is happening here ? Is there a maximum depth for broadcasting ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-spring-singleton-service-with-Spark-tp22997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is RDD to PairRDDFunctions only via implicits?
This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved the RDD API, but it could be even more discoverable if made available via the API directly. I assume this was originally an omission that now needs to be kept for backwards compatibility, but would any of the repo owners be open to making this more discoverable to the point of API docs and tab completion (while keeping both binary and source compatibility)? class PairRDD extends RDD{ pair methods } RDD{ def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V] } As long as the implicits remain, then compatibility remains, but now it is explicit in the docs on how to get a PairRDD and in tab completion. Thoughts? Justin Pihony
Re: FetchFailedException and MetadataFetchFailedException
hmm, sorry I think that disproves my theory. Nothing else is immediately coming to mind. Its possible there is more info in the logs from the driver, couldn't hurt to send those (though I don't have high hopes of finding anything that way). Offchance this could be from too many open files or something? Normally there is a different error msg, but I figure its worth asking anyway. The error you reported here was slightly different from your original post. This error is from writing the shuffle map output, while the original error you reported was a fetch failed, which is from reading the shuffle data on the reduce side in the next stage. Does the map stage actually finish, even though the tasks are throwing these errors while writing the map output? Or do you sometimes get failures on the shuffle write side, and sometimes on the shuffle read side? (Not that I think you are doing anything wrong, but it may help narrow down the root cause and possibly file a bug.) thanks On Fri, May 22, 2015 at 4:40 AM, Rok Roskar rokros...@gmail.com wrote: on the worker/container that fails, the file not found is the first error -- the output below is from the yarn log. There were some python worker crashes for another job/stage earlier (see the warning at 18:36) but I expect those to be unrelated to this file not found error. == LogType:stderr Log Upload Time:15-May-2015 18:50:05 LogLength:5706 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4 j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting to kill Python Worker 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0 (TID 995) java.io.FileNotFoundException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3 -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:212) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local spark dir: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489 java.io.IOException: Failed to delete: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489 at
Re: partitioning after extracting from a hive table?
I guess not. Spark partitions correspond to number of splits. On 23 May 2015 00:02, Cesar Flores ces...@gmail.com wrote: I have a table in a Hive database partitioning by date. I notice that when I query this table using HiveContext the created data frame has an specific number of partitions. Do this partitioning corresponds to my original table partitioning in Hive? Thanks -- Cesar Flores
Re: Performance degradation between spark 0.9.3 and 1.3.1
I don't think that 0.9.3 has been released, so I'm assuming that you're running on branch-0.9. There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that this question doesn't have a concise answer: https://github.com/apache/spark/compare/branch-0.9...v1.3.1 To narrow down the potential causes, have you tried comparing 0.9.3 to, say, 1.0.2 or branch-1.0, or some other version that's closer to 0.9? On Fri, May 22, 2015 at 9:43 AM, Shay Seng s...@urbanengines.com wrote: Hi. I have a job that takes ~50min with Spark 0.9.3 and ~1.8hrs on Spark 1.3.1 on the same cluster. The only code difference between the two code bases is to fix the Seq - Iter changes that happened in the Spark 1.x series. Are there any other changes in the defaults from spark 0.9.3 - 1.3.1 that would cause such a large degradation in performance? Changes in partitioning algorithms, scheduling etc? shay
Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
Thank you guys for the prompt help. I ended up building spark master and verified what DB has suggested. val lr = (new MlLogisticRegression) .setFitIntercept(true) .setMaxIter(35) val model = lr.fit(sqlContext.createDataFrame(training)) val scoreAndLabels = model.transform(sqlContext.createDataFrame(test)) .select(probability, label) .map { case Row(probability: Vector, label: Double) = (probability(1), label) } Without doing much tuning, above generates Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914] Intercept: -3.3076806966913006 Area under ROC: 0.7033511043412033 I also tried it on a much bigger dataset I have and its results are close to what I get from statsmodel. Now early waiting for the 1.4 release. Thanks, Xin On Wed, May 20, 2015 at 9:37 PM, Chris Gore cdg...@cdgore.com wrote: I tried running this data set as described with my own implementation of L2 regularized logistic regression using LBFGS to compare: https://github.com/cdgore/fitbox Intercept: -0.886745823033 Weights (['gre', 'gpa', 'rank']):[ 0.28862268 0.19402388 -0.36637964] Area under ROC: 0.724056603774 The difference could be from the feature preprocessing as mentioned. I normalized the features around 0: binary_train_normalized = (binary_train - binary_train.mean()) / binary_train.std() binary_test_normalized = (binary_test - binary_train.mean()) / binary_train.std() On a data set this small, the difference in models could also be the result of how the training/test sets were split. Have you tried running k-folds cross validation on a larger data set? Chris On May 20, 2015, at 6:15 PM, DB Tsai d...@netflix.com.INVALID wrote: Hi Xin, If you take a look at the model you trained, the intercept from Spark is significantly smaller than StatsModel, and the intercept represents a prior on categories in LOR which causes the low accuracy in Spark implementation. In LogisticRegressionWithLBFGS, the intercept is regularized due to the implementation of Updater, and the intercept should not be regularized. In the new pipleline APIs, a LOR with elasticNet is implemented, and the intercept is properly handled. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala As you can see the tests, https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala the result is exactly the same as R now. BTW, in both version, the feature scalings are done before training, and we train the model in scaled space but transform the model weights back to original space. The only difference is in the mllib version, LogisticRegressionWithLBFGS regularizes the intercept while in the ml version, the intercept is excluded from regularization. As a result, if lambda is zero, the model should be the same. On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote: Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of being admitted to graduate school increases by 0.72 in statsmodel than 0.04 in Spark. I have seen much bigger difference with other data. So my question is has anyone compared the results with other libraries and is anything wrong with my code to invoke LogisticRegressionWithLBFGS? As the real data I am processing is pretty big and really want to use Spark to get this to work. Please let me know if you have similar experience and how you resolve it. Thanks, Xin