Re: Performance tuning for local mode on one host

2016-07-25 Thread on

There are 4 cores on my system.

Running spark with setMaster("local[2]") results in:
PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+
COMMAND 
   

7 root  20   0 4748836 563400  29064 S  24.6  7.0   1:16.54
/usr/jdk1.8.0_101/bin/java -cp
/conf/:/usr/spark-2.0.0-preview-bin-hadoop2.6/jars/* -Xmx1g
org.apache.spark.de+
114 root  20   0  114208  31956   7028 S  15.7  0.4   0:16.35 python
-m
pyspark.daemon  
 

117 root  20   0  114404  32116   7028 S  15.7  0.4   0:17.28 python
-m
pyspark.daemon  
 

41 root  20   0  443548  60920  10416 S   0.0  0.8   0:10.84 python
/test.py


111 root  20   0  101272  31740   9356 S   0.0  0.4   0:00.29 python
-m pyspark.daemon 

with a processing time over 3 seconds running the code below. There must
be a lot of overhead somewhere as the code some nearly nothing, i.e., no
expensive calculations on a socket stream getting one message per second.

How to reduce this overhead?


On 25.07.2016 20:19, on wrote:
> OK, sorry, I am running in local mode.
> Just a very small setup...
>
> (changed the subject)
>
> On 25.07.2016 20:01, Mich Talebzadeh wrote:
>> Hi,
>>
>> From your reference I can see that you are running in local mode with
>> two cores. But that is not standalone.
>>
>> Can you please clarify whether you start master and slaves processes.
>> Those are for standalone mode.
>>
>> sbin/start-master.sh
>> sbin/start-slaves.sh
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>  
>>
>> LinkedIn
>> / 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
>>
>>  
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk.Any and all responsibility for
>> any loss, damage or destruction of data or any other property which
>> may arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary
>> damages arising from such loss, damage or destruction.
>>
>>  
>>
>>
>> On 25 July 2016 at 18:21, on > > wrote:
>>
>> Dear all,
>>
>> I am running spark on one host ("local[2]") doing calculations
>> like this
>> on a socket stream.
>> mainStream = socketStream.filter(lambda msg:
>> msg['header'].startswith('test')).map(lambda x: (x['host'], x) )
>> s1 = mainStream.updateStateByKey(updateFirst).map(lambda x: (1, x) )
>> s2 = mainStream.updateStateByKey(updateSecond,
>> initialRDD=initialMachineStates).map(lambda x: (2, x) )
>> out.join(bla2).foreachRDD(no_out)
>>
>> I evaluated each calculations allone has a processing time about 400ms
>> but processing time of the code above is over 3 sec on average.
>>
>> I know there are a lot of parameters unknown but does anybody has
>> hints
>> how to tune this code / system? I already changed a lot of parameters,
>> such as #executors, #cores and so on.
>>
>> Thanks in advance and best regards,
>> on
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
>>
>>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



?????? read parquetfile in spark-sql error

2016-07-25 Thread cj
thank you.I see this sql in the spark doc: 
http://spark.apache.org/docs/1.6.1/sql-programming-guide.html













--  --
??: "Takeshi Yamamuro";;
: 2016??7??26??(??) 6:15
??: "cj"<124411...@qq.com>; 
: "user"; 
: Re: read parquetfile in spark-sql error



Hi,

Seems your query was not consist with the HQL syntax.
you'd better off re-checking the definitions: 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable


// maropu


On Mon, Jul 25, 2016 at 11:36 PM, Kabeer Ahmed  wrote:
  I hope the below sample helps you:  
 
 val parquetDF = hiveContext.read.parquet("hdfs://.parquet") 
 parquetDF.registerTempTable("parquetTable")
 sql("SELECT * FROM parquetTable").collect().foreach(println)
 
 
 
 Kabeer.
 Sent from
 
  Nylas N1, the extensible, open source mail client.
 
  On Jul 25 2016, at 12:09 pm, cj <124411...@qq.com> wrote: 
  hi,all:
 
 
   I use spark1.6.1 as my work env.
   
   when I saved the following content as test1.sql file :
   
 CREATE TEMPORARY TABLE parquetTable USING org.apache.spark.sql.parquet OPTIONS 
(   path "examples/src/main/resources/people.parquet" ) SELECT * FROM 
parquetTable 
 
 and use bin/spark-sql to run it 
(/home/bae/dataplatform/spark-1.6.1/bin/spark-sql  --properties-file 
./spark-dataplatform.conf -f test1.sql ),I encountered a grammar error.
 
 
 
 
  SET hive.support.sql11.reserved.keywords=false
 SET spark.sql.hive.version=1.2.1
 SET spark.sql.hive.version=1.2.1
 NoViableAltException(280@[192:1: tableName : (db= identifier DOT tab= 
identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME $tab) 
);])
 at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
 at org.antlr.runtime.DFA.predict(DFA.java:116)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4747)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:45918)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:5029)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2640)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1650)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1109)
 at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
 at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
 at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
 at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
 at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
 at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
 at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
 at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
 at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
 at 

Re: spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread kevin
thanks a lot Terry

2016-07-26 12:03 GMT+08:00 Terry Hoo :

> Kevin,
>
> Try to create the StreamingContext as following:
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
>
>
>
> On Tue, Jul 26, 2016 at 11:25 AM, kevin  wrote:
>
>> hi,all:
>> I want to read data from kafka and regist as a table then join a jdbc
>> table.
>> My sample like this :
>>
>> val spark = SparkSession
>>   .builder
>>   .config(sparkConf)
>>   .getOrCreate()
>>
>> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
>> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
>> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
>> jdbcDF.cache().createOrReplaceTempView("black_book")
>>   val df = spark.sql("select * from black_book")
>>   df.show()
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> ssc.checkpoint("checkpoint")
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicMap).map(_._2)
>> val words = lines.flatMap(_.split(" "))
>>
>> *I got error :*
>>
>> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
>> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
>> ++++
>> |  id|username|password|
>> ++++
>> |e6faca36-8766-4dc...|   a|   a|
>> |699285a3-a108-457...|   admin| 123|
>> |e734752d-ac98-483...|test|test|
>> |c0245226-128d-487...|   test2|   test2|
>> |4f1bbdb2-89d1-4cc...| 119| 911|
>> |16a9a360-13ee-4b5...|1215|1215|
>> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
>> |de30747c-c466-404...| why| why|
>> |644741c9-8fd7-4a5...|   scala|   p|
>> |cda1e44d-af4b-461...| 123| 231|
>> |6e409ed9-c09b-4e7...| 798|  23|
>> ++++
>>
>> Exception in thread "main" org.apache.spark.SparkException: Only one
>> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
>> error, set spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>>
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
>> main.POC$.main(POC.scala:43)
>> main.POC.main(POC.scala)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:498)
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
>> at
>> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
>> at org.apache.spark.SparkContext.(SparkContext.scala:91)
>> at
>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
>> at main.POC$.main(POC.scala:50)
>> at main.POC.main(POC.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>


Re: read parquetfile in spark-sql error

2016-07-25 Thread cj
thank you.but I hope to read the parquet file as a table in the spark-sql,not 
in the java(or scala) program.


-- Original --
From:  "Kabeer Ahmed";;
Date:  Mon, Jul 25, 2016 10:36 PM
To:  "cj"<124411...@qq.com>; 
Cc:  "user"; "lian.cs.zju"; 
Subject:  Re: read parquetfile in spark-sql error



 I hope the below sample helps you:  
 
 val parquetDF = hiveContext.read.parquet("hdfs://.parquet") 
 parquetDF.registerTempTable("parquetTable")
 sql("SELECT * FROM parquetTable").collect().foreach(println)
 
 
 
 Kabeer.
 Sent from
 
  Nylas N1, the extensible, open source mail client.
 
  On Jul 25 2016, at 12:09 pm, cj <124411...@qq.com> wrote: 
  hi,all:
 
 
   I use spark1.6.1 as my work env.
   
   when I saved the following content as test1.sql file :
   
 CREATE TEMPORARY TABLE parquetTable USING org.apache.spark.sql.parquet OPTIONS 
(   path "examples/src/main/resources/people.parquet" ) SELECT * FROM 
parquetTable 
 
 and use bin/spark-sql to run it 
(/home/bae/dataplatform/spark-1.6.1/bin/spark-sql  --properties-file 
./spark-dataplatform.conf -f test1.sql ),I encountered a grammar error.
 
 
 
 
  SET hive.support.sql11.reserved.keywords=false
 SET spark.sql.hive.version=1.2.1
 SET spark.sql.hive.version=1.2.1
 NoViableAltException(280@[192:1: tableName : (db= identifier DOT tab= 
identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME $tab) 
);])
 at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
 at org.antlr.runtime.DFA.predict(DFA.java:116)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4747)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:45918)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:5029)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2640)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1650)
 at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1109)
 at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
 at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
 at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
 at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
 at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
 at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
 at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
 at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
 at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
 at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:290)
 at 
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:237)
 at 
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:236)
 at 

Re: spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread Terry Hoo
Kevin,

Try to create the StreamingContext as following:

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))



On Tue, Jul 26, 2016 at 11:25 AM, kevin  wrote:

> hi,all:
> I want to read data from kafka and regist as a table then join a jdbc
> table.
> My sample like this :
>
> val spark = SparkSession
>   .builder
>   .config(sparkConf)
>   .getOrCreate()
>
> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
> jdbcDF.cache().createOrReplaceTempView("black_book")
>   val df = spark.sql("select * from black_book")
>   df.show()
>
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> ssc.checkpoint("checkpoint")
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
> val words = lines.flatMap(_.split(" "))
>
> *I got error :*
>
> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
> ++++
> |  id|username|password|
> ++++
> |e6faca36-8766-4dc...|   a|   a|
> |699285a3-a108-457...|   admin| 123|
> |e734752d-ac98-483...|test|test|
> |c0245226-128d-487...|   test2|   test2|
> |4f1bbdb2-89d1-4cc...| 119| 911|
> |16a9a360-13ee-4b5...|1215|1215|
> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
> |de30747c-c466-404...| why| why|
> |644741c9-8fd7-4a5...|   scala|   p|
> |cda1e44d-af4b-461...| 123| 231|
> |6e409ed9-c09b-4e7...| 798|  23|
> ++++
>
> Exception in thread "main" org.apache.spark.SparkException: Only one
> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
> error, set spark.driver.allowMultipleContexts = true. The currently running
> SparkContext was created at:
>
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
> main.POC$.main(POC.scala:43)
> main.POC.main(POC.scala)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
> at
> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
> at org.apache.spark.SparkContext.(SparkContext.scala:91)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
> at main.POC$.main(POC.scala:50)
> at main.POC.main(POC.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>


Re: ORC v/s Parquet for Spark 2.0

2016-07-25 Thread janardhan shetty
Thanks Timur for the explanation.
What about if the data is  log-data which is delimited(csv or tsv) and
doesn't have too many nestings and are in file formats ?

On Mon, Jul 25, 2016 at 7:38 PM, Timur Shenkao  wrote:

> 1) The opinions on StackOverflow are correct, not biased.
> 2) Cloudera promoted Parquet, Hortonworks - ORC + Tez. When it became
> obvious that just file format is not enough and Impala sucks, then Cloudera
> announced https://vision.cloudera.com/one-platform/ and focused on Spark
> 3) There is a race between ORC & Parquet: after some perfect release ORC
> becomes better & faster, then, several months later, Parquet may outperform.
> 4) If you use "flat" tables --> ORC is better. If you have highly nested
> data with arrays inside of dictionaries (for instance, json that isn't
> flattened) then may be one should choose Parquet
> 5) AFAIK, Parquet has its metadata at the end of the file (correct me if
> something has changed) . It means that Parquet file must be completely read
> & put into RAM. If there is no enough RAM or file somehow is corrupted -->
> problems arise
>
> On Tue, Jul 26, 2016 at 5:09 AM, janardhan shetty 
> wrote:
>
>> Just wondering advantages and disadvantages to convert data into ORC or
>> Parquet.
>>
>> In the documentation of Spark there are numerous examples of Parquet
>> format.
>>
>> Any strong reasons to chose Parquet over ORC file format ?
>>
>> Also : current data compression is bzip2
>>
>>
>> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>> This seems like biased.
>>
>
>


spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread kevin
hi,all:
I want to read data from kafka and regist as a table then join a jdbc table.
My sample like this :

val spark = SparkSession
  .builder
  .config(sparkConf)
  .getOrCreate()

val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
"jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
jdbcDF.cache().createOrReplaceTempView("black_book")
  val df = spark.sql("select * from black_book")
  df.show()

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))

*I got error :*

16/07/26 11:18:07 WARN AbstractHandler: No Server set for
org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
++++
|  id|username|password|
++++
|e6faca36-8766-4dc...|   a|   a|
|699285a3-a108-457...|   admin| 123|
|e734752d-ac98-483...|test|test|
|c0245226-128d-487...|   test2|   test2|
|4f1bbdb2-89d1-4cc...| 119| 911|
|16a9a360-13ee-4b5...|1215|1215|
|bf7d6a0d-2949-4c3...|   demo3|   demo3|
|de30747c-c466-404...| why| why|
|644741c9-8fd7-4a5...|   scala|   p|
|cda1e44d-af4b-461...| 123| 231|
|6e409ed9-c09b-4e7...| 798|  23|
++++

Exception in thread "main" org.apache.spark.SparkException: Only one
SparkContext may be running in this JVM (see SPARK-2243). To ignore this
error, set spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
main.POC$.main(POC.scala:43)
main.POC.main(POC.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
at
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
at org.apache.spark.SparkContext.(SparkContext.scala:91)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
at main.POC$.main(POC.scala:50)
at main.POC.main(POC.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: Maintaining order of pair rdd

2016-07-25 Thread janardhan shetty
groupBy is a shuffle operation and index is already lost in this process if
I am not wrong and don't see *sortWith* operation on RDD.

Any suggestions or help ?

On Mon, Jul 25, 2016 at 12:58 AM, Marco Mistroni 
wrote:

> Hi
>  after you do a groupBy you should use a sortWith.
> Basically , a groupBy reduces your structure to (anyone correct me if i m
> wrong) a RDD[(key,val)], which you can see as a tuple.so you could use
> sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
> hth
>
> On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty 
> wrote:
>
>> Thanks Marco. This solved the order problem. Had another question which
>> is prefix to this.
>>
>> As you can see below ID2,ID1 and ID3 are in order and I need to maintain
>> this index order as well. But when we do groupByKey 
>> operation(*rdd.distinct.groupByKey().mapValues(v
>> => v.toArray*))
>> everything is *jumbled*.
>> Is there any way we can maintain this order as well ?
>>
>> scala> RDD.foreach(println)
>> (ID2,18159)
>> (ID1,18159)
>> (ID3,18159)
>>
>> (ID2,18159)
>> (ID1,18159)
>> (ID3,18159)
>>
>> (ID2,36318)
>> (ID1,36318)
>> (ID3,36318)
>>
>> (ID2,54477)
>> (ID1,54477)
>> (ID3,54477)
>>
>> *Jumbled version : *
>> Array(
>> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*,
>> 145272, 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683,
>> 58866, 162076, 45431, 100136)),
>> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022, 39244,
>> 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*, 308703,
>> 160992, 45431, 162076)),
>> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
>> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
>> 45431, *36318*, 162076))
>> )
>>
>> *Expected output:*
>> Array(
>> (ID1,Array(*18159*,*36318*, *54477,...*)),
>> (ID3,Array(*18159*,*36318*, *54477, ...*)),
>> (ID2,Array(*18159*,*36318*, *54477, ...*))
>> )
>>
>> As you can see after *groupbyKey* operation is complete item 18519 is in
>> index 0 for ID1, index 2 for ID3 and index 16 for ID2 where as expected is
>> index 0
>>
>>
>> On Sun, Jul 24, 2016 at 12:43 PM, Marco Mistroni 
>> wrote:
>>
>>> Hello
>>>  Uhm you have an array containing 3 tuples?
>>> If all the arrays have same length, you can just zip all of them,
>>> creatings a list of tuples
>>> then you can scan the list 5 by 5...?
>>>
>>> so something like
>>>
>>> (Array(0)_2,Array(1)._2,Array(2)._2).zipped.toList
>>>
>>> this will give you a list of tuples of 3 elements containing each items
>>> from ID1, ID2 and ID3  ... sample below
>>> res: List((18159,100079,308703), (308703, 19622, 54477), (72636,18159,
>>> 89366)..)
>>>
>>> then you can use a recursive function to compare each element such as
>>>
>>> def iterate(lst:List[(Int, Int, Int)]):T = {
>>> if (lst.isEmpty): /// return your comparison
>>> else {
>>>  val splits = lst.splitAt(5)
>>>  // do sometjhing about it using splits._1
>>>  iterate(splits._2)
>>>}
>>>
>>> will this help? or am i still missing something?
>>>
>>> kr
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 24 Jul 2016 5:52 pm, "janardhan shetty" 
>>> wrote:
>>>
 Array(
 (ID1,Array(18159, 308703, 72636, 64544, 39244, 107937, 54477, 145272,
 100079, 36318, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
 45431, 100136)),
 (ID3,Array(100079, 19622, 18159, 212064, 107937, 44683, 150022, 39244,
 100136, 58866, 72636, 145272, 817, 89366, 54477, 36318, 308703, 160992,
 45431, 162076)),
 (ID2,Array(308703, 54477, 89366, 39244, 150022, 72636, 817, 58866,
 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, 18159, 45431,
 36318, 162076))
 )

 I need to compare first 5 elements of ID1 with first five element of
 ID3  next first 5 elements of ID1 to ID2. Similarly next 5 elements in that
 order until the end of number of elements.
 Let me know if this helps


 On Sun, Jul 24, 2016 at 7:45 AM, Marco Mistroni 
 wrote:

> Apologies I misinterpreted could you post two use cases?
> Kr
>
> On 24 Jul 2016 3:41 pm, "janardhan shetty" 
> wrote:
>
>> Marco,
>>
>> Thanks for the response. It is indexed order and not ascending or
>> descending order.
>> On Jul 24, 2016 7:37 AM, "Marco Mistroni" 
>> wrote:
>>
>>> Use map values to transform to an rdd where values are sorted?
>>> Hth
>>>
>>> On 24 Jul 2016 6:23 am, "janardhan shetty" 
>>> wrote:
>>>
 I have a key,value pair rdd where value is an array of Ints. I need
 to maintain the order of the value in order to execute downstream
 modifications. How do we maintain the order of values?
 Ex:
 rdd = (id1,[5,2,3,15],

UDF returning generic Seq

2016-07-25 Thread Chris Beavers
Hey there,

Interested in writing a UDF that returns an ArrayType column of unknown
subtype. My understanding is that this translated JVM-type-wise be a Seq of
generic templated type: Seq[Any]. I seem to be hitting the constraint at
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala:657
that
basically necessitates a fully qualified schema on the return type (i.e.
the templated Any is hitting the default exception throwing case at the end
of schemaFor).

Is there any more canonical way have a UDF produce an ArrayType column of
unknown type? Or is my only alternative here to reduce this to BinaryType
and use whatever encoding/data structures I want under the covers there and
in subsequent UDFs?

Thanks,
Chris


Re: spark context stop vs close

2016-07-25 Thread Mail.com
Okay. Yes it is JavaSparkContext. Thanks.

> On Jul 24, 2016, at 1:31 PM, Sean Owen  wrote:
> 
> I think this is about JavaSparkContext which implements the standard
> Closeable interface for convenience. Both do exactly the same thing.
> 
>> On Sun, Jul 24, 2016 at 6:27 PM, Jacek Laskowski  wrote:
>> Hi,
>> 
>> I can only find stop. Where did you find close?
>> 
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>>> On Sat, Jul 23, 2016 at 3:11 PM, Mail.com  wrote:
>>> Hi All,
>>> 
>>> Where should we us spark context stop vs close. Should we stop the context 
>>> first and then close.
>>> 
>>> Are general guidelines around this. When I stop and later try to close I 
>>> get RPC already closed error.
>>> 
>>> Thanks,
>>> Pradeep
>>> 
>>> 
>>> 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ORC v/s Parquet for Spark 2.0

2016-07-25 Thread janardhan shetty
Just wondering advantages and disadvantages to convert data into ORC or
Parquet.

In the documentation of Spark there are numerous examples of Parquet
format.

Any strong reasons to chose Parquet over ORC file format ?

Also : current data compression is bzip2

http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
This seems like biased.


Re: Odp.: spark2.0 can't run SqlNetworkWordCount

2016-07-25 Thread kevin
thanks a lot .after change to scala 2.11 , it works.

2016-07-25 17:40 GMT+08:00 Tomasz Gawęda :

> Hi,
>
> Please change Scala version to 2.11.  As far as I know, Spark packages are
> now build with Scala 2.11 and I've got other - 2.10 - version
>
>
>
> --
> *Od:* kevin 
> *Wysłane:* 25 lipca 2016 11:33
> *Do:* user.spark; dev.spark
> *Temat:* spark2.0 can't run SqlNetworkWordCount
>
> hi,all:
> I download spark2.0 per-build. I can run SqlNetworkWordCount test use :
> bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount
> master1 
>
> but when I use spark2.0 example source code SqlNetworkWordCount.scala and
> build it to a jar bao with dependencies ( JDK 1.8 AND SCALA2.10)
> when I use spark-submit to run it I got error:
>
> 16/07/25 17:28:30 INFO scheduler.JobScheduler: Starting job streaming job
> 146943891 ms.0 from job set of time 146943891 ms
> Exception in thread "streaming-job-executor-2"
> java.lang.NoSuchMethodError:
> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
> at
> main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:67)
> at
> main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:61)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> at scala.util.Try$.apply(Try.scala:192)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>


Spark Web UI port 4040 not working

2016-07-25 Thread Jestin Ma
Hello, when running spark jobs, I can access the master UI (port 8080 one)
no problem. However, I'm confused as to how to access the web UI to see
jobs/tasks/stages/etc.

I can access the master UI at http://:8080. But port 4040
gives me a -connection cannot be reached-.

Is the web UI http:// with a port of 4040?

I'm running my Spark job on a cluster machine and submitting it to a master
node part of the cluster. I heard of ssh tunneling; is that relevant here?

Thank you!


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
Thank you,I can't find spark-streaming-kafka_2.10 jar for spark2 from maven
center. so I try the version 1.6.2,it not work ,it need class
org.apache.spark.Logging, which can't find in spark2. so I build
spark-streaming-kafka_2.10
jar for spark2 from the source code. it's work now.

2016-07-26 2:12 GMT+08:00 Cody Koeninger :

> For 2.0, the kafka dstream support is in two separate subprojects
> depending on which version of Kafka you are using
>
> spark-streaming-kafka-0-10
> or
> spark-streaming-kafka-0-8
>
> corresponding to brokers that are version 0.10+ or 0.8+
>
> On Mon, Jul 25, 2016 at 12:29 PM, Reynold Xin  wrote:
> > The presentation at Spark Summit SF was probably referring to Structured
> > Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the
> same
> > production stability level as Spark 1.6. There is also Kafka 0.10
> support in
> > dstream.
> >
> > On July 25, 2016 at 10:26:49 AM, Andy Davidson
> > (a...@santacruzintegration.com) wrote:
> >
> > Hi Kevin
> >
> > Just a heads up at the recent spark summit in S.F. There was a
> presentation
> > about streaming in 2.0. They said that streaming was not going to
> production
> > ready in 2.0.
> >
> > I am not sure if the older 1.6.x version will be supported. My project
> will
> > not be able to upgrade with streaming support. We also use kafka
> >
> > Andy
> >
> > From: Marco Mistroni 
> > Date: Monday, July 25, 2016 at 2:33 AM
> > To: kevin 
> > Cc: "user @spark" , "dev.spark"
> > 
> > Subject: Re: where I can find spark-streaming-kafka for spark2.0
> >
> > Hi Kevin
> >   you should not need to rebuild everything.
> > Instead, i believe you should launch spark-submit by specifying the kafka
> > jar file in your --packages... i had to follow same when integrating
> spark
> > streaming with flume
> >
> >   have you checked this link ?
> > https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> >
> >
> > hth
> >
> >
> >
> > On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
> >>
> >> I have compile it from source code
> >>
> >> 2016-07-25 12:05 GMT+08:00 kevin :
> >>>
> >>> hi,all :
> >>> I try to run example
> org.apache.spark.examples.streaming.KafkaWordCount ,
> >>> I got error :
> >>> Exception in thread "main" java.lang.NoClassDefFoundError:
> >>> org/apache/spark/streaming/kafka/KafkaUtils$
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> at
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>> at
> >>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> >>> at
> >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>> Caused by: java.lang.ClassNotFoundException:
> >>> org.apache.spark.streaming.kafka.KafkaUtils$
> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>> ... 11 more
> >>>
> >>> so where I can find spark-streaming-kafka for spark2.0
> >>
> >>
> >
>


Re: Potential Change in Kafka's Partition Assignment Semantics when Subscription Changes

2016-07-25 Thread Cody Koeninger
This seems really low risk to me.  In order to be impacted, it'd have
to be someone who was using the kafka integration in spark 2.0, which
isn't even officially released yet.

On Mon, Jul 25, 2016 at 7:23 PM, Vahid S Hashemian
 wrote:
> Sorry, meant to ask if any Apache Sparkuser would be affected.
>
> --Vahid
>
>
>
> From:Vahid S Hashemian/Silicon Valley/IBM@IBMUS
> To:user@spark.apache.org, d...@spark.apache.org
> Date:07/25/2016 05:21 PM
> Subject:Potential Change in Kafka's Partition Assignment Semantics
> when Subscription Changes
> 
>
>
>
> Hello,
>
> We have started a KIP under the Kafka project that proposes a fix for an
> inconsistency in how partition assignments are currently handled in Kafka
> when the consumer changes subscription. Note that this applies to new
> consumer only.
> The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change
>
> The compatibility section
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change#KIP-70:RevisePartitionAssignmentSemanticsonNewConsumer'sSubscriptionChange-Compatibility,Deprecation,andMigrationPlan)
> describes impacted users.
>
> We would like to know if any Apache Storm user would be affected by this
> change. Thanks.
>
> Regards,
> --Vahid
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Yash Sharma
Correction -
dataDF.write.partitionBy(“year”, “month”,
“date”).mode(SaveMode.Append).text(“s3://data/test2/events/”)

On Tue, Jul 26, 2016 at 10:59 AM, Yash Sharma  wrote:

> Based on the behavior of spark [1], Overwrite mode will delete all your
> data when you try to overwrite a particular partition.
>
> What I did-
> - Use S3 api to delete all partitions
> - Use spark df to write in Append mode [2]
>
>
> 1.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-deletes-all-existing-partitions-in-SaveMode-Overwrite-Expected-behavior-td18219.html
>
> 2. dataDF.write.partitionBy(“year”, “month”,
> “date”).mode(SaveMode.Overwrite).text(“s3://data/test2/events/”)
>
> On Tue, Jul 26, 2016 at 9:37 AM, Pedro Rodriguez 
> wrote:
>
>> Probably should have been more specific with the code we are using, which
>> is something like
>>
>> val df = 
>> df.write.mode("append or overwrite
>> here").partitionBy("date").saveAsTable("my_table")
>>
>> Unless there is something like what I described on the native API, I will
>> probably take the approach of having a S3 API call to wipe out that
>> partition before the job starts, but it would be nice to not have to
>> incorporate another step in the job.
>>
>> Pedro
>>
>> On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri 
>> wrote:
>>
>>> You can have a temporary file to capture the data that you would like to
>>> overwrite. And swap that with existing partition that you would want to
>>> wipe the data away. Swapping can be done by simple rename of the partition
>>> and just repair the table to pick up the new partition.
>>>
>>> Am not sure if that addresses your scenario.
>>>
>>> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez 
>>> wrote:
>>>
>>> What would be the best way to accomplish the following behavior:
>>>
>>> 1. There is a table which is partitioned by date
>>> 2. Spark job runs on a particular date, we would like it to wipe out all
>>> data for that date. This is to make the job idempotent and lets us rerun a
>>> job if it failed without fear of duplicated data
>>> 3. Preserve data for all other dates
>>>
>>> I am guessing that overwrite would not work here or if it does its not
>>> guaranteed to stay that way, but am not sure. If thats the case, is there a
>>> good/robust way to get this behavior?
>>>
>>> --
>>> Pedro Rodriguez
>>> PhD Student in Distributed Machine Learning | CU Boulder
>>> UC Berkeley AMPLab Alumni
>>>
>>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>>> Github: github.com/EntilZha | LinkedIn:
>>> https://www.linkedin.com/in/pedrorodriguezscience
>>>
>>>
>>>
>>> Collective[i] dramatically improves sales and marketing performance
>>> using technology, applications and a revolutionary network designed to
>>> provide next generation analytics and decision-support directly to business
>>> users. Our goal is to maximize human potential and minimize mistakes. In
>>> most cases, the results are astounding. We cannot, however, stop emails
>>> from sometimes being sent to the wrong person. If you are not the intended
>>> recipient, please notify us by replying to this email's sender and deleting
>>> it (and any attachments) permanently from your system. If you are, please
>>> respect the confidentiality of this communication's contents.
>>
>>
>>
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Yash Sharma
Based on the behavior of spark [1], Overwrite mode will delete all your
data when you try to overwrite a particular partition.

What I did-
- Use S3 api to delete all partitions
- Use spark df to write in Append mode [2]


1.
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-deletes-all-existing-partitions-in-SaveMode-Overwrite-Expected-behavior-td18219.html

2. dataDF.write.partitionBy(“year”, “month”,
“date”).mode(SaveMode.Overwrite).text(“s3://data/test2/events/”)

On Tue, Jul 26, 2016 at 9:37 AM, Pedro Rodriguez 
wrote:

> Probably should have been more specific with the code we are using, which
> is something like
>
> val df = 
> df.write.mode("append or overwrite
> here").partitionBy("date").saveAsTable("my_table")
>
> Unless there is something like what I described on the native API, I will
> probably take the approach of having a S3 API call to wipe out that
> partition before the job starts, but it would be nice to not have to
> incorporate another step in the job.
>
> Pedro
>
> On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri  wrote:
>
>> You can have a temporary file to capture the data that you would like to
>> overwrite. And swap that with existing partition that you would want to
>> wipe the data away. Swapping can be done by simple rename of the partition
>> and just repair the table to pick up the new partition.
>>
>> Am not sure if that addresses your scenario.
>>
>> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez 
>> wrote:
>>
>> What would be the best way to accomplish the following behavior:
>>
>> 1. There is a table which is partitioned by date
>> 2. Spark job runs on a particular date, we would like it to wipe out all
>> data for that date. This is to make the job idempotent and lets us rerun a
>> job if it failed without fear of duplicated data
>> 3. Preserve data for all other dates
>>
>> I am guessing that overwrite would not work here or if it does its not
>> guaranteed to stay that way, but am not sure. If thats the case, is there a
>> good/robust way to get this behavior?
>>
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>>
>> Collective[i] dramatically improves sales and marketing performance using
>> technology, applications and a revolutionary network designed to provide
>> next generation analytics and decision-support directly to business users.
>> Our goal is to maximize human potential and minimize mistakes. In most
>> cases, the results are astounding. We cannot, however, stop emails from
>> sometimes being sent to the wrong person. If you are not the intended
>> recipient, please notify us by replying to this email's sender and deleting
>> it (and any attachments) permanently from your system. If you are, please
>> respect the confidentiality of this communication's contents.
>
>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: Potential Change in Kafka's Partition Assignment Semantics when Subscription Changes

2016-07-25 Thread Vahid S Hashemian
Sorry, meant to ask if any Apache Spark user would be affected.

--Vahid



From:   Vahid S Hashemian/Silicon Valley/IBM@IBMUS
To: user@spark.apache.org, d...@spark.apache.org
Date:   07/25/2016 05:21 PM
Subject:Potential Change in Kafka's Partition Assignment Semantics 
when Subscription Changes



Hello,

We have started a KIP under the Kafka project that proposes a fix for an 
inconsistency in how partition assignments are currently handled in Kafka 
when the consumer changes subscription. Note that this applies to new 
consumer only.
The KIP can be found here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change


The compatibility section (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change#KIP-70:RevisePartitionAssignmentSemanticsonNewConsumer'sSubscriptionChange-Compatibility,Deprecation,andMigrationPlan
) describes impacted users.

We would like to know if any Apache Storm user would be affected by this 
change. Thanks.
 
Regards,
--Vahid





Potential Change in Kafka's Partition Assignment Semantics when Subscription Changes

2016-07-25 Thread Vahid S Hashemian
Hello,

We have started a KIP under the Kafka project that proposes a fix for an 
inconsistency in how partition assignments are currently handled in Kafka 
when the consumer changes subscription. Note that this applies to new 
consumer only.
The KIP can be found here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change

The compatibility section (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-70%3A+Revise+Partition+Assignment+Semantics+on+New+Consumer%27s+Subscription+Change#KIP-70:RevisePartitionAssignmentSemanticsonNewConsumer'sSubscriptionChange-Compatibility,Deprecation,andMigrationPlan
) describes impacted users.

We would like to know if any Apache Storm user would be affected by this 
change. Thanks.
 
Regards,
--Vahid



Num of executors and cores

2016-07-25 Thread Mail.com
Hi All,

I have a directory which has 12 files. I want to read the entire file so I am 
reading it as wholeTextFiles(dirpath, numPartitions).

I run spark-submit as  --num-executors 12 --executor-cores 1 
and numPartitions 12.

However, when I run the job I see that the stage which reads the directory has 
only 8 tasks. So some task reads more than one file and takes twice the time.

What can I do that the files are read by 12 tasks  I.e one file per task.

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
Probably should have been more specific with the code we are using, which
is something like

val df = 
df.write.mode("append or overwrite
here").partitionBy("date").saveAsTable("my_table")

Unless there is something like what I described on the native API, I will
probably take the approach of having a S3 API call to wipe out that
partition before the job starts, but it would be nice to not have to
incorporate another step in the job.

Pedro

On Mon, Jul 25, 2016 at 5:23 PM, RK Aduri  wrote:

> You can have a temporary file to capture the data that you would like to
> overwrite. And swap that with existing partition that you would want to
> wipe the data away. Swapping can be done by simple rename of the partition
> and just repair the table to pick up the new partition.
>
> Am not sure if that addresses your scenario.
>
> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez 
> wrote:
>
> What would be the best way to accomplish the following behavior:
>
> 1. There is a table which is partitioned by date
> 2. Spark job runs on a particular date, we would like it to wipe out all
> data for that date. This is to make the job idempotent and lets us rerun a
> job if it failed without fear of duplicated data
> 3. Preserve data for all other dates
>
> I am guessing that overwrite would not work here or if it does its not
> guaranteed to stay that way, but am not sure. If thats the case, is there a
> good/robust way to get this behavior?
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>
>
> Collective[i] dramatically improves sales and marketing performance using
> technology, applications and a revolutionary network designed to provide
> next generation analytics and decision-support directly to business users.
> Our goal is to maximize human potential and minimize mistakes. In most
> cases, the results are astounding. We cannot, however, stop emails from
> sometimes being sent to the wrong person. If you are not the intended
> recipient, please notify us by replying to this email's sender and deleting
> it (and any attachments) permanently from your system. If you are, please
> respect the confidentiality of this communication's contents.




-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651, took 19.556700 s Killed

2016-07-25 Thread Ascot Moss
Hi,

spark: 1.6.1
java: java 1.8_u40
I tried random forest training phase, the same code works well if with 20
trees (lower accuracy, about 68%).  When trying the training phase with
more tree, I set to 200 trees, it returned:

"DAGScheduler: Job 20 finished: collectAsMap at DecisionTree.scala:651,
took 19.556700 s Killed" .  There is no WARN or ERROR from console, the
task is just stopped in the end.

Any idea how to resolve it? Should the timeout parameter be set to longer

regards


(below is the log from console)

16/07/26 00:02:47 INFO DAGScheduler: looking for newly runnable stages

16/07/26 00:02:47 INFO DAGScheduler: running: Set()

16/07/26 00:02:47 INFO DAGScheduler: waiting: Set(ResultStage 32)

16/07/26 00:02:47 INFO DAGScheduler: failed: Set()

16/07/26 00:02:47 INFO DAGScheduler: Submitting ResultStage 32
(MapPartitionsRDD[75] at map at DecisionTree.scala:642), which has no
missing parents

16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48 stored as values in
memory (estimated size 2.2 MB, free 18.2 MB)

16/07/26 00:02:47 INFO MemoryStore: Block broadcast_48_piece0 stored as
bytes in memory (estimated size 436.9 KB, free 18.7 MB)

16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
memory on x.x.x.x:35450 (size: 436.9 KB, free: 45.8 GB)

16/07/26 00:02:47 INFO SparkContext: Created broadcast 48 from broadcast at
DAGScheduler.scala:1006

16/07/26 00:02:47 INFO DAGScheduler: Submitting 4 missing tasks from
ResultStage 32 (MapPartitionsRDD[75] at map at DecisionTree.scala:642)

16/07/26 00:02:47 INFO TaskSchedulerImpl: Adding task set 32.0 with 4 tasks

16/07/26 00:02:47 INFO TaskSetManager: Starting task 0.0 in stage 32.0 (TID
185, x.x.x.x, partition 0,NODE_LOCAL, 1956 bytes)

16/07/26 00:02:47 INFO TaskSetManager: Starting task 1.0 in stage 32.0 (TID
186, x.x.x.x, partition 1,NODE_LOCAL, 1956 bytes)

16/07/26 00:02:47 INFO TaskSetManager: Starting task 2.0 in stage 32.0 (TID
187, x.x.x.x, partition 2,NODE_LOCAL, 1956 bytes)

16/07/26 00:02:47 INFO TaskSetManager: Starting task 3.0 in stage 32.0 (TID
188, x.x.x.x, partition 3,NODE_LOCAL, 1956 bytes)

16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
memory on x.x.x.x:58784 (size: 436.9 KB, free: 5.1 GB)

16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
output locations for shuffle 12 to x.x.x.x:44434

16/07/26 00:02:47 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 12 is 180 bytes

16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
memory on x.x.x.x:46186 (size: 436.9 KB, free: 2.2 GB)

16/07/26 00:02:47 INFO BlockManagerInfo: Added broadcast_48_piece0 in
memory on x.x.x.x:50132 (size: 436.9 KB, free: 5.0 GB)

16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
output locations for shuffle 12 to x.x.x.x:47272

16/07/26 00:02:47 INFO MapOutputTrackerMasterEndpoint: Asked to send map
output locations for shuffle 12 to x.x.x.x:46802

16/07/26 00:02:49 INFO TaskSetManager: Finished task 2.0 in stage 32.0 (TID
187) in 2265 ms on x.x.x.x (1/4)

16/07/26 00:02:49 INFO TaskSetManager: Finished task 1.0 in stage 32.0 (TID
186) in 2266 ms on x.x.x.x (2/4)

16/07/26 00:02:50 INFO TaskSetManager: Finished task 0.0 in stage 32.0 (TID
185) in 2794 ms on x.x.x.x (3/4)

16/07/26 00:02:50 INFO TaskSetManager: Finished task 3.0 in stage 32.0 (TID
188) in 3738 ms on x.x.x.x (4/4)

16/07/26 00:02:50 INFO TaskSchedulerImpl: Removed TaskSet 32.0, whose tasks
have all completed, from pool

16/07/26 00:02:50 INFO DAGScheduler: ResultStage 32 (collectAsMap at
DecisionTree.scala:651) finished in 3.738 s

16/07/26 00:02:50 INFO DAGScheduler: Job 19 finished: collectAsMap at
DecisionTree.scala:651, took 19.493917 s

16/07/26 00:02:51 INFO MemoryStore: Block broadcast_49 stored as values in
memory (estimated size 1053.9 KB, free 19.7 MB)

16/07/26 00:02:52 INFO MemoryStore: Block broadcast_49_piece0 stored as
bytes in memory (estimated size 626.7 KB, free 20.3 MB)

16/07/26 00:02:52 INFO BlockManagerInfo: Added broadcast_49_piece0 in
memory on x.x.x.x:35450 (size: 626.7 KB, free: 45.8 GB)

16/07/26 00:02:52 INFO SparkContext: Created broadcast 49 from broadcast at
DecisionTree.scala:601

16/07/26 00:02:52 INFO SparkContext: Starting job: collectAsMap at
DecisionTree.scala:651

16/07/26 00:02:52 INFO DAGScheduler: Registering RDD 76 (mapPartitions at
DecisionTree.scala:622)

16/07/26 00:02:52 INFO DAGScheduler: Got job 20 (collectAsMap at
DecisionTree.scala:651) with 4 output partitions

16/07/26 00:02:52 INFO DAGScheduler: Final stage: ResultStage 34
(collectAsMap at DecisionTree.scala:651)

16/07/26 00:02:52 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 33)

16/07/26 00:02:52 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
33)

16/07/26 00:02:52 INFO DAGScheduler: Submitting ShuffleMapStage 33
(MapPartitionsRDD[76] at mapPartitions at DecisionTree.scala:622), which
has no missing parents

16/07/26 00:02:52 INFO 

Re: Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread RK Aduri
You can have a temporary file to capture the data that you would like to 
overwrite. And swap that with existing partition that you would want to wipe 
the data away. Swapping can be done by simple rename of the partition and just 
repair the table to pick up the new partition.

Am not sure if that addresses your scenario.

> On Jul 25, 2016, at 4:18 PM, Pedro Rodriguez  wrote:
> 
> What would be the best way to accomplish the following behavior:
> 
> 1. There is a table which is partitioned by date
> 2. Spark job runs on a particular date, we would like it to wipe out all data 
> for that date. This is to make the job idempotent and lets us rerun a job if 
> it failed without fear of duplicated data
> 3. Preserve data for all other dates
> 
> I am guessing that overwrite would not work here or if it does its not 
> guaranteed to stay that way, but am not sure. If thats the case, is there a 
> good/robust way to get this behavior?
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodrig...@gmail.com  | pedrorodriguez.io 
>  | 909-353-4423
> Github: github.com/EntilZha  | LinkedIn: 
> https://www.linkedin.com/in/pedrorodriguezscience 
> 
> 


-- 
Collective[i] dramatically improves sales and marketing performance using 
technology, applications and a revolutionary network designed to provide 
next generation analytics and decision-support directly to business users. 
Our goal is to maximize human potential and minimize mistakes. In most 
cases, the results are astounding. We cannot, however, stop emails from 
sometimes being sent to the wrong person. If you are not the intended 
recipient, please notify us by replying to this email's sender and deleting 
it (and any attachments) permanently from your system. If you are, please 
respect the confidentiality of this communication's contents.


Spark SQL overwrite/append for partitioned tables

2016-07-25 Thread Pedro Rodriguez
What would be the best way to accomplish the following behavior:

1. There is a table which is partitioned by date
2. Spark job runs on a particular date, we would like it to wipe out all
data for that date. This is to make the job idempotent and lets us rerun a
job if it failed without fear of duplicated data
3. Preserve data for all other dates

I am guessing that overwrite would not work here or if it does its not
guaranteed to stay that way, but am not sure. If thats the case, is there a
good/robust way to get this behavior?

-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread ayan guha
STS works on YARN, as a yarn-client application.

One issue: STS is not HA-supported, though there was some discussion to
make it HA similar to Hive Server. So what we did is to run sts on multiple
nodes and tie them to a load balancer. .

On Tue, Jul 26, 2016 at 8:06 AM, Mich Talebzadeh 
wrote:

> Correction.
>
> STS uses the same UI to display details about all processes running
> against it which is helpful but gets crowded
>
> :)
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 July 2016 at 22:26, Mich Talebzadeh 
> wrote:
>
>> We also should remember that STS is a pretty useful tool. With JDBC you
>> can use beeline, Zeppelin, Squirrel and other tools against it.
>>
>> One thing I like to change is the UI port that the thrift server listens
>> and you can change it at startup using spark.ui.port. This is fixed at
>> thrift startup and can only display one sql query at a time which is kind
>> not useful.
>>
>> As one can run multiple clients against STS, it is a
>> limitation that one cannot change the UI port at runtime.
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 25 July 2016 at 22:04, Jacek Laskowski  wrote:
>>
>>> On Mon, Jul 25, 2016 at 10:57 PM, Mich Talebzadeh
>>>  wrote:
>>>
>>> > Yarn promises the best resource management I believe. Having said that
>>> I have not used Mesos myself.
>>>
>>> I'm glad you've mentioned it.
>>>
>>> I think Cloudera (and Hortonworks?) guys are doing a great job with
>>> bringing all the features of YARN to Spark and I think Spark on YARN
>>> shines features-wise.
>>>
>>> I'm not in a position to compare YARN vs Mesos for their resource
>>> management, but Spark on Mesos is certainly lagging behind Spark on
>>> YARN regarding the features Spark uses off the scheduler backends --
>>> security, data locality, queues, etc. (or I might be simply biased
>>> after having spent months with Spark on YARN mostly?).
>>>
>>> Jacek
>>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


How to partition a SparkDataFrame using all distinct column values in sparkR

2016-07-25 Thread Neil Chang
Hi,
  This is a question regarding SparkR in spark 2.0.

Given that I have a SparkDataFrame and I want to partition it using one
column's values. Each value corresponds to a partition, all rows that
having the same column value shall go to the same partition, no more no
less.

   Seems the function repartition() doesn't do this, I have 394 unique
values, it just partitions my DataFrame into 200. If I specify the
numPartitions to 394, some mismatch happens.

Is it possible to do what I described in sparkR?
GroupBy doesn't work with udf at all.

Or can we split the DataFrame into list of small ones first, if so, what
can I use?

Thanks,
Neil


Re: read parquetfile in spark-sql error

2016-07-25 Thread Takeshi Yamamuro
Hi,

Seems your query was not consist with the HQL syntax.
you'd better off re-checking the definitions:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable

// maropu

On Mon, Jul 25, 2016 at 11:36 PM, Kabeer Ahmed 
wrote:

> I hope the below sample helps you:
>
> val parquetDF = hiveContext.read.parquet("hdfs://.parquet")
> parquetDF.registerTempTable("parquetTable")
> sql("SELECT * FROM parquetTable").collect().foreach(println)
>
> Kabeer.
> Sent from
>  Nylas N1
> ,
> the extensible, open source mail client.
> On Jul 25 2016, at 12:09 pm, cj <124411...@qq.com> wrote:
>
>> hi,all:
>>
>>   I use spark1.6.1 as my work env.
>>
>>   when I saved the following content as test1.sql file :
>>
>>
>> CREATE TEMPORARY TABLE parquetTable
>>
>> USING org.apache.spark.sql.parquetOPTIONS (
>>   path "examples/src/main/resources/people.parquet")
>> SELECT * FROM parquetTable
>>
>>
>> and use bin/spark-sql to run it
>> (/home/bae/dataplatform/spark-1.6.1/bin/spark-sql  --properties-file
>> ./spark-dataplatform.conf -f test1.sql ),I encountered a grammar error.
>>
>>
>> SET hive.support.sql11.reserved.keywords=false
>> SET spark.sql.hive.version=1.2.1
>> SET spark.sql.hive.version=1.2.1
>> NoViableAltException(280@[192:1: tableName : (db= identifier DOT tab=
>> identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME
>> $tab) );])
>> at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
>> at org.antlr.runtime.DFA.predict(DFA.java:116)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4747)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:45918)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:5029)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2640)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1650)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1109)
>> at
>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
>> at
>> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
>> at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
>> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
>> at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>> at
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>> at
>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>> at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
>> at
>> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>> at
>> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:290)
>> at
>> 

Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Mich Talebzadeh
Correction.

STS uses the same UI to display details about all processes running against
it which is helpful but gets crowded

:)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 22:26, Mich Talebzadeh  wrote:

> We also should remember that STS is a pretty useful tool. With JDBC you
> can use beeline, Zeppelin, Squirrel and other tools against it.
>
> One thing I like to change is the UI port that the thrift server listens
> and you can change it at startup using spark.ui.port. This is fixed at
> thrift startup and can only display one sql query at a time which is kind
> not useful.
>
> As one can run multiple clients against STS, it is a
> limitation that one cannot change the UI port at runtime.
>
> Cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 July 2016 at 22:04, Jacek Laskowski  wrote:
>
>> On Mon, Jul 25, 2016 at 10:57 PM, Mich Talebzadeh
>>  wrote:
>>
>> > Yarn promises the best resource management I believe. Having said that
>> I have not used Mesos myself.
>>
>> I'm glad you've mentioned it.
>>
>> I think Cloudera (and Hortonworks?) guys are doing a great job with
>> bringing all the features of YARN to Spark and I think Spark on YARN
>> shines features-wise.
>>
>> I'm not in a position to compare YARN vs Mesos for their resource
>> management, but Spark on Mesos is certainly lagging behind Spark on
>> YARN regarding the features Spark uses off the scheduler backends --
>> security, data locality, queues, etc. (or I might be simply biased
>> after having spent months with Spark on YARN mostly?).
>>
>> Jacek
>>
>
>


SPARK UDF related issue

2016-07-25 Thread Carlo . Allocca
Hi All, 

I am using SPARK 2.0 and I have got the following issue: 

I am able to run the step 1-5 (see below) but not the step 6 which uses an UDF. 
Actually, the step 1-5 takes few second and the step 6 looks like that it never 
ends.

Is there anything wrong?  how should I address it?

Any suggestion/feedback would be very appreciated. 

Many Thanks in advance for your help. 

Best regards, 
Carlo


 

== Code 


=== STEP 1
SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.config("spark.executor.memory", "3g")
.getOrCreate();


=== STEP 2
this.spark.udf().register("computeBindingValue", new 
UDF1() {
@Override
public Integer call(String newBindingValue) throws Exception {  
  
if(newBindingValue.contains("Paperback")) return 1;
return 2;
}
}, DataTypes.IntegerType);

=== STEP 3
Dataset cmptDS_TMP = cmptDS
.select(window(cmptDS.col("created"), "1 
hour").as("PRD_TimeWindow#1"),
cmptDS.col("asin").as("PRD_asin#1"),
cmptDS.col("sale_rank").as("PRD_global_sale_rank")
);

=== STEP 4
Dataset resultProd = prdDS
.select(
prdDS.col("asin").alias("PRD_asin#300"),
prdDS.col("rppprice").alias("PRD_rppprice"),
prdDS.col("binding").alias("PRD_binding")

).distinct().sort("PRD_asin#300");

=== STEP 5
Dataset cmptDS_TMP_join_resultProd=cmptDS_TMP
.join(resultProd, 
cmptDS_TMP.col("PRD_asin#1").equalTo(resultProd.col("PRD_asin#300")), "inner"); 
  
cmptDS_TMP_join_resultProd.show();

=== STEP 6
   Dataset prodWithBindingValue = 
cmptDS_TMP_join_resultProd.withColumn("PRD_bindingValue",
callUDF("computeBindingValue", 
cmptDS_TMP_join_resultProd.col("PRD_binding")));
prodWithBindingValue.show();



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Bzip2 to Parquet format

2016-07-25 Thread Takeshi Yamamuro
Hi,

This is the expected behaivour.
A default compression for parquet is `snappy`.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L215

// maropu

On Tue, Jul 26, 2016 at 6:33 AM, janardhan shetty 
wrote:

> Andrew,
>
> 2.0
>
> I tried
> val inputR = sc.textFile(file)
> val inputS = inputR.map(x => x.split("`"))
> val inputDF = inputS.toDF()
>
> inputDF.write.format("parquet").save(result.parquet)
>
> Result part files end with *.snappy.parquet *is that expected ?
>
> On Sun, Jul 24, 2016 at 8:00 PM, Andrew Ehrlich 
> wrote:
>
>> You can load the text with sc.textFile() to an RDD[String], then use
>> .map() to convert it into an RDD[Row]. At this point you are ready to
>> apply a schema. Use sqlContext.createDataFrame(rddOfRow, structType)
>>
>> Here is an example on how to define the StructType (schema) that you
>> will combine with the RDD[Row] to create a DataFrame.
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
>>
>> Once you have the DataFrame, save it to parquet with
>> dataframe.save(“/path”) to create a parquet file.
>>
>> Reference for SQLContext / createDataFrame:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext
>>
>>
>>
>> On Jul 24, 2016, at 5:34 PM, janardhan shetty 
>> wrote:
>>
>> We have data in Bz2 compression format. Any links in Spark to convert
>> into Parquet and also performance benchmarks and uses study materials ?
>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Bzip2 to Parquet format

2016-07-25 Thread janardhan shetty
Andrew,

2.0

I tried
val inputR = sc.textFile(file)
val inputS = inputR.map(x => x.split("`"))
val inputDF = inputS.toDF()

inputDF.write.format("parquet").save(result.parquet)

Result part files end with *.snappy.parquet *is that expected ?

On Sun, Jul 24, 2016 at 8:00 PM, Andrew Ehrlich  wrote:

> You can load the text with sc.textFile() to an RDD[String], then use
> .map() to convert it into an RDD[Row]. At this point you are ready to
> apply a schema. Use sqlContext.createDataFrame(rddOfRow, structType)
>
> Here is an example on how to define the StructType (schema) that you will
> combine with the RDD[Row] to create a DataFrame.
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
>
> Once you have the DataFrame, save it to parquet with
> dataframe.save(“/path”) to create a parquet file.
>
> Reference for SQLContext / createDataFrame:
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext
>
>
>
> On Jul 24, 2016, at 5:34 PM, janardhan shetty 
> wrote:
>
> We have data in Bz2 compression format. Any links in Spark to convert into
> Parquet and also performance benchmarks and uses study materials ?
>
>
>


Re: Pls assist: Creating Spak EC2 cluster using spark_ec2.py script and a custom AMI

2016-07-25 Thread Mayank Ahuja
Hi Marco,

>From AMI name shared, it seems to be HVM image. 'm1' instance family does
not support HVM (only PV is supported). Either you can use PV equivalent of
this image or you can use 'm3' family (easiest transition from m1 to m3, if
possible).

Details:
http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/virtualization_types.html

Thanks
Mayank

On Mon, Jul 25, 2016 at 7:37 AM, Marco Mistroni  wrote:

> HI all
>  i was wondering if anyone can help with this
> I Have created a spark cluster before using spark_ec2.py script from Spark
> 1.6.1
> that by default uses a very old AMI... so i decided to try to launch the
> script with a more up to date
> AMI.
> the one i have used is ami-d732f0b7, which refers to Ubuntu Server 14.04
> LTS (HVM), SSD Volume Type
>
>
>
> I have lauched the script as follows
>
> ./spark-ec2  --key-pair=ec2AccessKey --identity-file ec2AccessKey.pem
> --region=us-west-2 --ami ami-d732f0b7 launch my-spark-cluster
>
> but i am gettign this exception:
>
> Non-Windows instances with a virtualization type of 'hvm' are currently
> not supported for this instance type.
>
> which seems a bizarre exception to me as , in spark_ec2.py , instance
> m1.large (the one used to create spark master and nodes) is associated with
> vritualization=pvm
>
> "m1.large":"pvm"
>
>
> has anyone found similar issue? any suggestion on how can i use a custom
> AMI when creating a spark cluster?
>
> kind regards
>  marco
>
>
>


Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Mich Talebzadeh
We also should remember that STS is a pretty useful tool. With JDBC you can
use beeline, Zeppelin, Squirrel and other tools against it.

One thing I like to change is the UI port that the thrift server listens
and you can change it at startup using spark.ui.port. This is fixed at
thrift startup and can only display one sql query at a time which is kind
not useful.

As one can run multiple clients against STS, it is a
limitation that one cannot change the UI port at runtime.

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 22:04, Jacek Laskowski  wrote:

> On Mon, Jul 25, 2016 at 10:57 PM, Mich Talebzadeh
>  wrote:
>
> > Yarn promises the best resource management I believe. Having said that I
> have not used Mesos myself.
>
> I'm glad you've mentioned it.
>
> I think Cloudera (and Hortonworks?) guys are doing a great job with
> bringing all the features of YARN to Spark and I think Spark on YARN
> shines features-wise.
>
> I'm not in a position to compare YARN vs Mesos for their resource
> management, but Spark on Mesos is certainly lagging behind Spark on
> YARN regarding the features Spark uses off the scheduler backends --
> security, data locality, queues, etc. (or I might be simply biased
> after having spent months with Spark on YARN mostly?).
>
> Jacek
>


Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Jacek Laskowski
On Mon, Jul 25, 2016 at 10:57 PM, Mich Talebzadeh
 wrote:

> Yarn promises the best resource management I believe. Having said that I have 
> not used Mesos myself.

I'm glad you've mentioned it.

I think Cloudera (and Hortonworks?) guys are doing a great job with
bringing all the features of YARN to Spark and I think Spark on YARN
shines features-wise.

I'm not in a position to compare YARN vs Mesos for their resource
management, but Spark on Mesos is certainly lagging behind Spark on
YARN regarding the features Spark uses off the scheduler backends --
security, data locality, queues, etc. (or I might be simply biased
after having spent months with Spark on YARN mostly?).

Jacek

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0

2016-07-25 Thread Jacek Laskowski
Hi Bryan,

Excellent questions about the upcoming 2.0! Took me a while to find
the answer about structured streaming.

Seen 
http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
? That may be relevant to your question 2.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Jul 25, 2016 at 8:23 PM, Bryan Jeffrey  wrote:
> All,
>
> I had three questions:
>
> (1) Is there a timeline for stable Spark 2.0 release?  I know the 'preview'
> build is out there, but was curious what the timeline was for full release.
> Jira seems to indicate that there should be a release 7/27.
>
> (2)  For 'continuous' datasets there has been a lot of discussion. One item
> that came up in tickets was the idea that 'count()' and other functions do
> not apply to continuous datasets:
> https://github.com/apache/spark/pull/12080.  In this case what is the
> intended procedure to calculate a streaming statistic based on an interval
> (e.g. count the number of records in a 2 minute window every 2 minutes)?
>
> (3) In previous releases (1.6.1) the call to DStream / RDD repartition w/ a
> number of partitions set to zero silently deletes data.  I have looked in
> Jira for a similar issue, but I do not see one.  I would like to address
> this (and would likely be willing to go fix it myself).  Should I just
> create a ticket?
>
> Thank you,
>
> Bryan Jeffrey
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Mich Talebzadeh
Hi,

Actually I started STS in local mode and that works.

I have not tested yarn modes for STS but certainly it appears that one can
run these in any mode one wishes.

local mode has its limitation (all in one JPS and not taking advantage of
scaling out)  but one can run STS in local mode on the same host on
different ports without this centralised resource management that
standalone offers and certainly there are some issues with it as I have
seen. in local mode we are just scaling up

Let us see how it goes. Yarn promises the best resource management I
believe. Having said that I have not used Mesos myself.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 21:37, Jacek Laskowski  wrote:

> Hi,
>
> That's interesting...What holds STS back from working on the other
> scheduler backends, e.g. YARN or Mesos? I haven't spent much time with
> it, but thought it's a mere Spark application.
>
> The property is spark.deploy.spreadOut = Whether the standalone
> cluster manager should spread applications out across nodes or try to
> consolidate them onto as few nodes as possible. Spreading out is
> usually better for data locality in HDFS, but consolidating is more
> efficient for compute-intensive workloads.
>
> See https://spark.apache.org/docs/latest/spark-standalone.html
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Mon, Jul 25, 2016 at 9:24 PM, Mich Talebzadeh
>  wrote:
> > Thanks. As I understand STS only works in Standalone mode :(
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > Disclaimer: Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> The
> > author will in no case be liable for any monetary damages arising from
> such
> > loss, damage or destruction.
> >
> >
> >
> >
> > On 25 July 2016 at 19:34, Jacek Laskowski  wrote:
> >>
> >> Hi,
> >>
> >> My vague understanding of Spark Standalone is that it will take up all
> >> available workers for a Spark application (despite the cmd options).
> There
> >> was a property to disable it. Can't remember it now though.
> >>
> >> Ps. Yet another reason for YARN ;-)
> >>
> >> Jacek
> >>
> >>
> >> On 25 Jul 2016 6:17 p.m., "Mich Talebzadeh" 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>>
> >>> I am doing some tests
> >>>
> >>> I have started Spark in Standalone mode.
> >>>
> >>> For simplicity I am using one node only with 8 works and I have 12
> cores
> >>>
> >>> In spark-env.sh I set this
> >>>
> >>> # Options for the daemons used in the standalone deploy mode
> >>> export SPARK_WORKER_CORES=1 ##, total number of cores to be used by
> >>> executors by each worker
> >>> export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers
> >>> have to give executors (e.g. 1000m, 2g)
> >>> the worker
> >>> export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker
> processes
> >>> per node
> >>>
> >>> So it is pretty straight forward with 8 works and each worker assigned
> >>> one core
> >>>
> >>> jps|grep Worker
> >>> 15297 Worker
> >>> 14794 Worker
> >>> 15374 Worker
> >>> 14998 Worker
> >>> 15198 Worker
> >>> 15465 Worker
> >>> 14897 Worker
> >>> 15099 Worker
> >>>
> >>> I start Spark Thrift Server with the following parameters (using
> >>> standalone mode)
> >>>
> >>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> >>> --master spark://50.140.197.217:7077 \
> >>> --hiveconf hive.server2.thrift.port=10055 \
> >>> --driver-memory 1G \
> >>> --num-executors 1 \
> >>> --executor-cores 1 \
> >>> --executor-memory 1G \
> >>> --conf "spark.scheduler.mode=FIFO" \
> >>>
> >>> With one executor allocated 1 core
> >>>
> >>> However, I can see both in the OS and UI that it starts with 8
> executors,
> >>> the same number of workers on this node!
> >>>
> >>> jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
> >>> 32711 SparkSubmit
> >>> 369 

Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Jacek Laskowski
Hi,

That's interesting...What holds STS back from working on the other
scheduler backends, e.g. YARN or Mesos? I haven't spent much time with
it, but thought it's a mere Spark application.

The property is spark.deploy.spreadOut = Whether the standalone
cluster manager should spread applications out across nodes or try to
consolidate them onto as few nodes as possible. Spreading out is
usually better for data locality in HDFS, but consolidating is more
efficient for compute-intensive workloads.

See https://spark.apache.org/docs/latest/spark-standalone.html

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Jul 25, 2016 at 9:24 PM, Mich Talebzadeh
 wrote:
> Thanks. As I understand STS only works in Standalone mode :(
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
> On 25 July 2016 at 19:34, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> My vague understanding of Spark Standalone is that it will take up all
>> available workers for a Spark application (despite the cmd options). There
>> was a property to disable it. Can't remember it now though.
>>
>> Ps. Yet another reason for YARN ;-)
>>
>> Jacek
>>
>>
>> On 25 Jul 2016 6:17 p.m., "Mich Talebzadeh" 
>> wrote:
>>>
>>> Hi,
>>>
>>>
>>> I am doing some tests
>>>
>>> I have started Spark in Standalone mode.
>>>
>>> For simplicity I am using one node only with 8 works and I have 12 cores
>>>
>>> In spark-env.sh I set this
>>>
>>> # Options for the daemons used in the standalone deploy mode
>>> export SPARK_WORKER_CORES=1 ##, total number of cores to be used by
>>> executors by each worker
>>> export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers
>>> have to give executors (e.g. 1000m, 2g)
>>> the worker
>>> export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker processes
>>> per node
>>>
>>> So it is pretty straight forward with 8 works and each worker assigned
>>> one core
>>>
>>> jps|grep Worker
>>> 15297 Worker
>>> 14794 Worker
>>> 15374 Worker
>>> 14998 Worker
>>> 15198 Worker
>>> 15465 Worker
>>> 14897 Worker
>>> 15099 Worker
>>>
>>> I start Spark Thrift Server with the following parameters (using
>>> standalone mode)
>>>
>>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>>> --master spark://50.140.197.217:7077 \
>>> --hiveconf hive.server2.thrift.port=10055 \
>>> --driver-memory 1G \
>>> --num-executors 1 \
>>> --executor-cores 1 \
>>> --executor-memory 1G \
>>> --conf "spark.scheduler.mode=FIFO" \
>>>
>>> With one executor allocated 1 core
>>>
>>> However, I can see both in the OS and UI that it starts with 8 executors,
>>> the same number of workers on this node!
>>>
>>> jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
>>> 32711 SparkSubmit
>>> 369 CoarseGrainedExecutorBackend
>>> 370 CoarseGrainedExecutorBackend
>>> 371 CoarseGrainedExecutorBackend
>>> 376 CoarseGrainedExecutorBackend
>>> 387 CoarseGrainedExecutorBackend
>>> 395 CoarseGrainedExecutorBackend
>>> 419 CoarseGrainedExecutorBackend
>>> 420 CoarseGrainedExecutorBackend
>>>
>>>
>>> I fail to see why this is happening. Nothing else is running Spark wise.
>>> The cause?
>>>
>>>  How can I stop STS going and using all available workers?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>>> loss, damage or destruction of data or any other property which may arise
>>> from relying on this email's technical content is explicitly disclaimed. The
>>> author will in no case be liable for any monetary damages arising from such
>>> loss, damage or destruction.
>>>
>>>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Check out Kyper! Trying to be Uber of Data

2016-07-25 Thread Daniel Lopes
I just signed up for Kyper and thought you might be interested, too!

http://l.aunch.us/L7Ezb


Re: Spark 1.6.2 version displayed as 1.6.1

2016-07-25 Thread Krishna Sankar
This intrigued me as well.

   - Just for sure, I downloaded the 1.6.2 code and recompiled.
   - spark-shell and pyspark both show 1.6.2 as expected.

Cheers

On Mon, Jul 25, 2016 at 1:45 AM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Another possible explanation is that by accident you are still running
> Spark 1.6.1. Which download are you using? This is what I see:
>
> $ ~/spark-1.6.2-bin-hadoop2.6/bin/spark-shell
> log4j:WARN No appenders could be found for logger
> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> Using Spark's repl log4j profile:
> org/apache/spark/log4j-defaults-repl.properties
> To adjust logging level use sc.setLogLevel("INFO")
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.2
>   /_/
>
>
> On Mon, Jul 25, 2016 at 7:45 AM, Sean Owen  wrote:
>
>> Are you certain? looks like it was correct in the release:
>>
>>
>> https://github.com/apache/spark/blob/v1.6.2/core/src/main/scala/org/apache/spark/package.scala
>>
>>
>>
>> On Mon, Jul 25, 2016 at 12:33 AM, Ascot Moss 
>> wrote:
>> > Hi,
>> >
>> > I am trying to upgrade spark from 1.6.1 to 1.6.2, from 1.6.2
>> spark-shell, I
>> > found the version is still displayed 1.6.1
>> >
>> > Is this a minor typo/bug?
>> >
>> > Regards
>> >
>> >
>> >
>> > ###
>> >
>> > Welcome to
>> >
>> >     __
>> >
>> >  / __/__  ___ _/ /__
>> >
>> > _\ \/ _ \/ _ `/ __/  '_/
>> >
>> >/___/ .__/\_,_/_/ /_/\_\   version 1.6.1
>> >
>> >   /_/
>> >
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark 2.0

2016-07-25 Thread Pedro Rodriguez
Spark 2.0 vote for RC5 passed last Friday night so it will probably be
released early this week if I had to guess.

On Mon, Jul 25, 2016 at 12:23 PM, Bryan Jeffrey 
wrote:

> All,
>
> I had three questions:
>
> (1) Is there a timeline for stable Spark 2.0 release?  I know the
> 'preview' build is out there, but was curious what the timeline was for
> full release. Jira seems to indicate that there should be a release 7/27.
>
> (2)  For 'continuous' datasets there has been a lot of discussion. One
> item that came up in tickets was the idea that 'count()' and other
> functions do not apply to continuous datasets:
> https://github.com/apache/spark/pull/12080.  In this case what is the
> intended procedure to calculate a streaming statistic based on an interval
> (e.g. count the number of records in a 2 minute window every 2 minutes)?
>
> (3) In previous releases (1.6.1) the call to DStream / RDD repartition w/
> a number of partitions set to zero silently deletes data.  I have looked in
> Jira for a similar issue, but I do not see one.  I would like to address
> this (and would likely be willing to go fix it myself).  Should I just
> create a ticket?
>
> Thank you,
>
> Bryan Jeffrey
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Mich Talebzadeh
Thanks. As I understand STS only works in Standalone mode :(

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 19:34, Jacek Laskowski  wrote:

> Hi,
>
> My vague understanding of Spark Standalone is that it will take up all
> available workers for a Spark application (despite the cmd options). There
> was a property to disable it. Can't remember it now though.
>
> Ps. Yet another reason for YARN ;-)
>
> Jacek
>
> On 25 Jul 2016 6:17 p.m., "Mich Talebzadeh" 
> wrote:
>
>> Hi,
>>
>>
>> I am doing some tests
>>
>> I have started Spark in Standalone mode.
>>
>> For simplicity I am using one node only with 8 works and I have 12 cores
>>
>> In spark-env.sh I set this
>>
>> # Options for the daemons used in the standalone deploy mode
>> export SPARK_WORKER_CORES=1 ##, total number of cores to be used by
>> executors by each worker
>> export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers
>> have to give executors (e.g. 1000m, 2g)
>> the worker
>> export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker processes
>> per node
>>
>> So it is pretty straight forward with 8 works and each worker assigned
>> one core
>>
>> jps|grep Worker
>> 15297 Worker
>> 14794 Worker
>> 15374 Worker
>> 14998 Worker
>> 15198 Worker
>> 15465 Worker
>> 14897 Worker
>> 15099 Worker
>>
>> I start Spark Thrift Server with the following parameters (using
>> standalone mode)
>>
>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>> --master spark://50.140.197.217:7077 \
>> --hiveconf hive.server2.thrift.port=10055 \
>> --driver-memory 1G \
>> --num-executors 1 \
>> --executor-cores 1 \
>> --executor-memory 1G \
>> --conf "spark.scheduler.mode=FIFO" \
>>
>> With one executor allocated 1 core
>>
>> However, I can see both in the OS and UI that it starts with 8 executors,
>> the same number of workers on this node!
>>
>> jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
>> 32711 SparkSubmit
>> 369 CoarseGrainedExecutorBackend
>> 370 CoarseGrainedExecutorBackend
>> 371 CoarseGrainedExecutorBackend
>> 376 CoarseGrainedExecutorBackend
>> 387 CoarseGrainedExecutorBackend
>> 395 CoarseGrainedExecutorBackend
>> 419 CoarseGrainedExecutorBackend
>> 420 CoarseGrainedExecutorBackend
>>
>>
>> I fail to see why this is happening. Nothing else is running Spark wise.
>> The cause?
>>
>>  How can I stop STS going and using all available workers?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: How to connect HBase and Spark using Python?

2016-07-25 Thread Def_Os
Solved, see:
http://stackoverflow.com/questions/38470114/how-to-connect-hbase-and-spark-using-python/38575095



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-connect-HBase-and-Spark-using-Python-tp27372p27409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Jacek Laskowski
Hi,

My vague understanding of Spark Standalone is that it will take up all
available workers for a Spark application (despite the cmd options). There
was a property to disable it. Can't remember it now though.

Ps. Yet another reason for YARN ;-)

Jacek

On 25 Jul 2016 6:17 p.m., "Mich Talebzadeh" 
wrote:

> Hi,
>
>
> I am doing some tests
>
> I have started Spark in Standalone mode.
>
> For simplicity I am using one node only with 8 works and I have 12 cores
>
> In spark-env.sh I set this
>
> # Options for the daemons used in the standalone deploy mode
> export SPARK_WORKER_CORES=1 ##, total number of cores to be used by
> executors by each worker
> export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers
> have to give executors (e.g. 1000m, 2g)
> the worker
> export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker processes
> per node
>
> So it is pretty straight forward with 8 works and each worker assigned one
> core
>
> jps|grep Worker
> 15297 Worker
> 14794 Worker
> 15374 Worker
> 14998 Worker
> 15198 Worker
> 15465 Worker
> 14897 Worker
> 15099 Worker
>
> I start Spark Thrift Server with the following parameters (using
> standalone mode)
>
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master spark://50.140.197.217:7077 \
> --hiveconf hive.server2.thrift.port=10055 \
> --driver-memory 1G \
> --num-executors 1 \
> --executor-cores 1 \
> --executor-memory 1G \
> --conf "spark.scheduler.mode=FIFO" \
>
> With one executor allocated 1 core
>
> However, I can see both in the OS and UI that it starts with 8 executors,
> the same number of workers on this node!
>
> jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
> 32711 SparkSubmit
> 369 CoarseGrainedExecutorBackend
> 370 CoarseGrainedExecutorBackend
> 371 CoarseGrainedExecutorBackend
> 376 CoarseGrainedExecutorBackend
> 387 CoarseGrainedExecutorBackend
> 395 CoarseGrainedExecutorBackend
> 419 CoarseGrainedExecutorBackend
> 420 CoarseGrainedExecutorBackend
>
>
> I fail to see why this is happening. Nothing else is running Spark wise.
> The cause?
>
>  How can I stop STS going and using all available workers?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Performance tuning for local mode on one host

2016-07-25 Thread Mich Talebzadeh
Hi On,

When you run in Spark mode there is only one SparkSubmit with one executor
only. How many cores do you have?

Each core will allow the same code to run concurrently so with local{8} you
will have 8 tasks running the same code on subset of your data

So do

cat /proc/cpuinfo|grep processor|wc -l


and determine how many Logical processors AKA cores you see


HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 19:19, on  wrote:

>
> OK, sorry, I am running in local mode.
> Just a very small setup...
>
> (changed the subject)
>
> On 25.07.2016 20:01, Mich Talebzadeh wrote:
> > Hi,
> >
> > From your reference I can see that you are running in local mode with
> > two cores. But that is not standalone.
> >
> > Can you please clarify whether you start master and slaves processes.
> > Those are for standalone mode.
> >
> > sbin/start-master.sh
> > sbin/start-slaves.sh
> >
> > HTH
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> > /
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk.Any and all responsibility for
> > any loss, damage or destruction of data or any other property which
> > may arise from relying on this email's technical content is explicitly
> > disclaimed. The author will in no case be liable for any monetary
> > damages arising from such loss, damage or destruction.
> >
> >
> >
> >
> > On 25 July 2016 at 18:21, on  > > wrote:
> >
> > Dear all,
> >
> > I am running spark on one host ("local[2]") doing calculations
> > like this
> > on a socket stream.
> > mainStream = socketStream.filter(lambda msg:
> > msg['header'].startswith('test')).map(lambda x: (x['host'], x) )
> > s1 = mainStream.updateStateByKey(updateFirst).map(lambda x: (1, x) )
> > s2 = mainStream.updateStateByKey(updateSecond,
> > initialRDD=initialMachineStates).map(lambda x: (2, x) )
> > out.join(bla2).foreachRDD(no_out)
> >
> > I evaluated each calculations allone has a processing time about
> 400ms
> > but processing time of the code above is over 3 sec on average.
> >
> > I know there are a lot of parameters unknown but does anybody has
> > hints
> > how to tune this code / system? I already changed a lot of
> parameters,
> > such as #executors, #cores and so on.
> >
> > Thanks in advance and best regards,
> > on
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > 
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark 2.0

2016-07-25 Thread Bryan Jeffrey
All,

I had three questions:

(1) Is there a timeline for stable Spark 2.0 release?  I know the 'preview'
build is out there, but was curious what the timeline was for full
release. Jira seems to indicate that there should be a release 7/27.

(2)  For 'continuous' datasets there has been a lot of discussion. One item
that came up in tickets was the idea that 'count()' and other functions do
not apply to continuous datasets: https://github.com/apache/spark/pull/12080.
In this case what is the intended procedure to calculate a streaming
statistic based on an interval (e.g. count the number of records in a 2
minute window every 2 minutes)?

(3) In previous releases (1.6.1) the call to DStream / RDD repartition w/ a
number of partitions set to zero silently deletes data.  I have looked in
Jira for a similar issue, but I do not see one.  I would like to address
this (and would likely be willing to go fix it myself).  Should I just
create a ticket?

Thank you,

Bryan Jeffrey


Re: Performance tuning for local mode on one host

2016-07-25 Thread on

OK, sorry, I am running in local mode.
Just a very small setup...

(changed the subject)

On 25.07.2016 20:01, Mich Talebzadeh wrote:
> Hi,
>
> From your reference I can see that you are running in local mode with
> two cores. But that is not standalone.
>
> Can you please clarify whether you start master and slaves processes.
> Those are for standalone mode.
>
> sbin/start-master.sh
> sbin/start-slaves.sh
>
> HTH
>
> Dr Mich Talebzadeh
>
>  
>
> LinkedIn
> / 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw/
>
>  
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk.Any and all responsibility for
> any loss, damage or destruction of data or any other property which
> may arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary
> damages arising from such loss, damage or destruction.
>
>  
>
>
> On 25 July 2016 at 18:21, on  > wrote:
>
> Dear all,
>
> I am running spark on one host ("local[2]") doing calculations
> like this
> on a socket stream.
> mainStream = socketStream.filter(lambda msg:
> msg['header'].startswith('test')).map(lambda x: (x['host'], x) )
> s1 = mainStream.updateStateByKey(updateFirst).map(lambda x: (1, x) )
> s2 = mainStream.updateStateByKey(updateSecond,
> initialRDD=initialMachineStates).map(lambda x: (2, x) )
> out.join(bla2).foreachRDD(no_out)
>
> I evaluated each calculations allone has a processing time about 400ms
> but processing time of the code above is over 3 sec on average.
>
> I know there are a lot of parameters unknown but does anybody has
> hints
> how to tune this code / system? I already changed a lot of parameters,
> such as #executors, #cores and so on.
>
> Thanks in advance and best regards,
> on
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
>
>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Cody Koeninger
For 2.0, the kafka dstream support is in two separate subprojects
depending on which version of Kafka you are using

spark-streaming-kafka-0-10
or
spark-streaming-kafka-0-8

corresponding to brokers that are version 0.10+ or 0.8+

On Mon, Jul 25, 2016 at 12:29 PM, Reynold Xin  wrote:
> The presentation at Spark Summit SF was probably referring to Structured
> Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the same
> production stability level as Spark 1.6. There is also Kafka 0.10 support in
> dstream.
>
> On July 25, 2016 at 10:26:49 AM, Andy Davidson
> (a...@santacruzintegration.com) wrote:
>
> Hi Kevin
>
> Just a heads up at the recent spark summit in S.F. There was a presentation
> about streaming in 2.0. They said that streaming was not going to production
> ready in 2.0.
>
> I am not sure if the older 1.6.x version will be supported. My project will
> not be able to upgrade with streaming support. We also use kafka
>
> Andy
>
> From: Marco Mistroni 
> Date: Monday, July 25, 2016 at 2:33 AM
> To: kevin 
> Cc: "user @spark" , "dev.spark"
> 
> Subject: Re: where I can find spark-streaming-kafka for spark2.0
>
> Hi Kevin
>   you should not need to rebuild everything.
> Instead, i believe you should launch spark-submit by specifying the kafka
> jar file in your --packages... i had to follow same when integrating spark
> streaming with flume
>
>   have you checked this link ?
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
>
> hth
>
>
>
> On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
>>
>> I have compile it from source code
>>
>> 2016-07-25 12:05 GMT+08:00 kevin :
>>>
>>> hi,all :
>>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>>> I got error :
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> at
>>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>>> at
>>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtils$
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 11 more
>>>
>>> so where I can find spark-streaming-kafka for spark2.0
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JavaRDD.foreach (new VoidFunction<>...) always returns the last element

2016-07-25 Thread Jia Zou
Hi Sean,

Thanks for your great help! It works all right if I remove persist!!

For next step, I will transform those values before persist.
I convert to RDD and back to JavaRDD just for testing purposes.

Best Regards,
Jia

On Mon, Jul 25, 2016 at 1:01 PM, Sean Owen  wrote:

> Why are you converting to RDD and back to JavaRDD?
> The problem is storing references to Writable, which are mutated by the
> InputFormat. Somewhere you have 1000 refs to the same key. I think it may
> be the persist. You want to immediately transform these values to something
> besides a Writable.
>
> On Mon, Jul 25, 2016, 18:50 Jia Zou  wrote:
>
>>
>> My code is as following:
>>
>> System.out.println("Initialize points...");
>>
>> JavaPairRDD data =
>>
>> sc.sequenceFile(inputFile, IntWritable.
>> class, DoubleArrayWritable.class);
>>
>> RDD> rdd =
>>
>> JavaPairRDD.toRDD(data);
>>
>> JavaRDD> points
>> = JavaRDD.fromRDD(rdd, data.classTag());
>>
>> points.persist(StorageLevel.MEMORY_ONLY());
>>
>> int i;
>>
>>
>>   for (i=0; i>
>> System.out.println("iteration="+i);
>>
>> //points.foreach(new
>> ForEachMapPointToCluster(numDimensions, numClusters));
>>
>> points.foreach(new
>> VoidFunction>() {
>>
>> public void call(Tuple2> DoubleArrayWritable> tuple) {
>>
>> IntWritable key = tuple._1();
>>
>> System.out.println("key:"+key.get());
>>
>> DoubleArrayWritable array = tuple._2();
>>
>> double[] point = array.getData();
>>
>> for (int d = 0; d < 20; d ++) {
>>
>> System.out.println(d+":"+point[d]);
>>
>> }
>>
>> }
>>
>> });
>>
>> }
>>
>>
>> The output is a lot of following, only the last element in the rdd has
>> been output.
>>
>> key:999
>>
>> 0:0.9953839426689233
>>
>> 1:0.12656798341145892
>>
>> 2:0.16621114723289654
>>
>> 3:0.48628049787614236
>>
>> 4:0.476991470215116
>>
>> 5:0.5033640235789054
>>
>> 6:0.09257098597507829
>>
>> 7:0.3153088440494892
>>
>> 8:0.8807426085223242
>>
>> 9:0.2809625780570739
>>
>> 10:0.9584880094505738
>>
>> 11:0.38521222520661547
>>
>> 12:0.5114241334425228
>>
>> 13:0.9524628903835111
>>
>> 14:0.5252549496842003
>>
>> 15:0.5732037830866236
>>
>> 16:0.8632451606583632
>>
>> 17:0.39754347061499895
>>
>> 18:0.2859522809981715
>>
>> 19:0.2659002343432888
>>
>> key:999
>>
>> 0:0.9953839426689233
>>
>> 1:0.12656798341145892
>>
>> 2:0.16621114723289654
>>
>> 3:0.48628049787614236
>>
>> 4:0.476991470215116
>>
>> 5:0.5033640235789054
>>
>> 6:0.09257098597507829
>>
>> 7:0.3153088440494892
>>
>> 8:0.8807426085223242
>>
>> 9:0.2809625780570739
>>
>> 10:0.9584880094505738
>>
>> 11:0.38521222520661547
>>
>> 12:0.5114241334425228
>>
>> 13:0.9524628903835111
>>
>> 14:0.5252549496842003
>>
>> 15:0.5732037830866236
>>
>> 16:0.8632451606583632
>>
>> 17:0.39754347061499895
>>
>> 18:0.2859522809981715
>>
>> 19:0.2659002343432888
>>
>> key:999
>>
>> 0:0.9953839426689233
>>
>> 1:0.12656798341145892
>>
>> 2:0.16621114723289654
>>
>> 3:0.48628049787614236
>>
>> 4:0.476991470215116
>>
>> 5:0.5033640235789054
>>
>> 6:0.09257098597507829
>>
>> 7:0.3153088440494892
>>
>> 8:0.8807426085223242
>>
>> 9:0.2809625780570739
>>
>> 10:0.9584880094505738
>>
>> 11:0.38521222520661547
>>
>> 12:0.5114241334425228
>>
>> 13:0.9524628903835111
>>
>> 14:0.5252549496842003
>>
>> 15:0.5732037830866236
>>
>> 16:0.8632451606583632
>>
>> 17:0.39754347061499895
>>
>> 18:0.2859522809981715
>>
>> 19:0.2659002343432888
>>
>


Re: JavaRDD.foreach (new VoidFunction<>...) always returns the last element

2016-07-25 Thread Sean Owen
Why are you converting to RDD and back to JavaRDD?
The problem is storing references to Writable, which are mutated by the
InputFormat. Somewhere you have 1000 refs to the same key. I think it may
be the persist. You want to immediately transform these values to something
besides a Writable.

On Mon, Jul 25, 2016, 18:50 Jia Zou  wrote:

>
> My code is as following:
>
> System.out.println("Initialize points...");
>
> JavaPairRDD data =
>
> sc.sequenceFile(inputFile, IntWritable.
> class, DoubleArrayWritable.class);
>
> RDD> rdd =
>
> JavaPairRDD.toRDD(data);
>
> JavaRDD> points =
> JavaRDD.fromRDD(rdd, data.classTag());
>
> points.persist(StorageLevel.MEMORY_ONLY());
>
> int i;
>
>
>   for (i=0; i
> System.out.println("iteration="+i);
>
> //points.foreach(new
> ForEachMapPointToCluster(numDimensions, numClusters));
>
> points.foreach(new
> VoidFunction>() {
>
> public void call(Tuple2 DoubleArrayWritable> tuple) {
>
> IntWritable key = tuple._1();
>
> System.out.println("key:"+key.get());
>
> DoubleArrayWritable array = tuple._2();
>
> double[] point = array.getData();
>
> for (int d = 0; d < 20; d ++) {
>
> System.out.println(d+":"+point[d]);
>
> }
>
> }
>
> });
>
> }
>
>
> The output is a lot of following, only the last element in the rdd has
> been output.
>
> key:999
>
> 0:0.9953839426689233
>
> 1:0.12656798341145892
>
> 2:0.16621114723289654
>
> 3:0.48628049787614236
>
> 4:0.476991470215116
>
> 5:0.5033640235789054
>
> 6:0.09257098597507829
>
> 7:0.3153088440494892
>
> 8:0.8807426085223242
>
> 9:0.2809625780570739
>
> 10:0.9584880094505738
>
> 11:0.38521222520661547
>
> 12:0.5114241334425228
>
> 13:0.9524628903835111
>
> 14:0.5252549496842003
>
> 15:0.5732037830866236
>
> 16:0.8632451606583632
>
> 17:0.39754347061499895
>
> 18:0.2859522809981715
>
> 19:0.2659002343432888
>
> key:999
>
> 0:0.9953839426689233
>
> 1:0.12656798341145892
>
> 2:0.16621114723289654
>
> 3:0.48628049787614236
>
> 4:0.476991470215116
>
> 5:0.5033640235789054
>
> 6:0.09257098597507829
>
> 7:0.3153088440494892
>
> 8:0.8807426085223242
>
> 9:0.2809625780570739
>
> 10:0.9584880094505738
>
> 11:0.38521222520661547
>
> 12:0.5114241334425228
>
> 13:0.9524628903835111
>
> 14:0.5252549496842003
>
> 15:0.5732037830866236
>
> 16:0.8632451606583632
>
> 17:0.39754347061499895
>
> 18:0.2859522809981715
>
> 19:0.2659002343432888
>
> key:999
>
> 0:0.9953839426689233
>
> 1:0.12656798341145892
>
> 2:0.16621114723289654
>
> 3:0.48628049787614236
>
> 4:0.476991470215116
>
> 5:0.5033640235789054
>
> 6:0.09257098597507829
>
> 7:0.3153088440494892
>
> 8:0.8807426085223242
>
> 9:0.2809625780570739
>
> 10:0.9584880094505738
>
> 11:0.38521222520661547
>
> 12:0.5114241334425228
>
> 13:0.9524628903835111
>
> 14:0.5252549496842003
>
> 15:0.5732037830866236
>
> 16:0.8632451606583632
>
> 17:0.39754347061499895
>
> 18:0.2859522809981715
>
> 19:0.2659002343432888
>


Re: Performance tuning for standalone on one host

2016-07-25 Thread Mich Talebzadeh
Hi,

>From your reference I can see that you are running in local mode with two
cores. But that is not standalone.

Can you please clarify whether you start master and slaves processes. Those
are for standalone mode.

sbin/start-master.sh
sbin/start-slaves.sh

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 18:21, on  wrote:

> Dear all,
>
> I am running spark on one host ("local[2]") doing calculations like this
> on a socket stream.
> mainStream = socketStream.filter(lambda msg:
> msg['header'].startswith('test')).map(lambda x: (x['host'], x) )
> s1 = mainStream.updateStateByKey(updateFirst).map(lambda x: (1, x) )
> s2 = mainStream.updateStateByKey(updateSecond,
> initialRDD=initialMachineStates).map(lambda x: (2, x) )
> out.join(bla2).foreachRDD(no_out)
>
> I evaluated each calculations allone has a processing time about 400ms
> but processing time of the code above is over 3 sec on average.
>
> I know there are a lot of parameters unknown but does anybody has hints
> how to tune this code / system? I already changed a lot of parameters,
> such as #executors, #cores and so on.
>
> Thanks in advance and best regards,
> on
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


JavaRDD.foreach (new VoidFunction<>...) always returns the last element

2016-07-25 Thread Jia Zou
My code is as following:

System.out.println("Initialize points...");

JavaPairRDD data =

sc.sequenceFile(inputFile, IntWritable.class,
DoubleArrayWritable.class);

RDD> rdd =

JavaPairRDD.toRDD(data);

JavaRDD> points =
JavaRDD.fromRDD(rdd, data.classTag());

points.persist(StorageLevel.MEMORY_ONLY());

int i;


  for (i=0; i

Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Andy Davidson
Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to production
ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From:  Marco Mistroni 
Date:  Monday, July 25, 2016 at 2:33 AM
To:  kevin 
Cc:  "user @spark" , "dev.spark"

Subject:  Re: where I can find spark-streaming-kafka for spark2.0

> Hi Kevin
>   you should not need to rebuild everything.
> Instead, i believe you should launch spark-submit by specifying the kafka jar
> file in your --packages... i had to follow same when integrating spark
> streaming with flume
> 
>   have you checked this link ?
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> 
> 
> hth
> 
>   
> 
> On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
>> I have compile it from source code
>> 
>> 2016-07-25 12:05 GMT+08:00 kevin :
>>> hi,all :
>>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount , I
>>> got error :
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> at 
>>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scal
>>> a:57)
>>> at 
>>> 
org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala>>>
)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62>>>
)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
>>> .java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
>>> nMain(SparkSubmit.scala:724)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtils$
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 11 more
>>> 
>>> so where I can find spark-streaming-kafka for spark2.0
>> 
> 




Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Reynold Xin
The presentation at Spark Summit SF was probably referring to Structured
Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the same
production stability level as Spark 1.6. There is also Kafka 0.10 support
in dstream.

On July 25, 2016 at 10:26:49 AM, Andy Davidson (
a...@santacruzintegration.com) wrote:

Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to
production ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From: Marco Mistroni 
Date: Monday, July 25, 2016 at 2:33 AM
To: kevin 
Cc: "user @spark" , "dev.spark" 
Subject: Re: where I can find spark-streaming-kafka for spark2.0

Hi Kevin
  you should not need to rebuild everything.
Instead, i believe you should launch spark-submit by specifying the kafka
jar file in your --packages... i had to follow same when integrating spark
streaming with flume

  have you checked this link ?
https://spark.apache.org/docs/latest/streaming-kafka-integration.html


hth



On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:

> I have compile it from source code
>
> 2016-07-25 12:05 GMT+08:00 kevin :
>
>> hi,all :
>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>> I got error :
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtils$
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 11 more
>>
>> so where I can find spark-streaming-kafka for spark2.0
>>
>
>


Performance tuning for standalone on one host

2016-07-25 Thread on
Dear all,

I am running spark on one host ("local[2]") doing calculations like this
on a socket stream.
mainStream = socketStream.filter(lambda msg:
msg['header'].startswith('test')).map(lambda x: (x['host'], x) )
s1 = mainStream.updateStateByKey(updateFirst).map(lambda x: (1, x) )
s2 = mainStream.updateStateByKey(updateSecond,
initialRDD=initialMachineStates).map(lambda x: (2, x) )
out.join(bla2).foreachRDD(no_out)

I evaluated each calculations allone has a processing time about 400ms
but processing time of the code above is over 3 sec on average.

I know there are a lot of parameters unknown but does anybody has hints
how to tune this code / system? I already changed a lot of parameters,
such as #executors, #cores and so on.

Thanks in advance and best regards,
on

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using DirectOutputCommitter with ORC

2016-07-25 Thread Daniel Haviv
Hi,
How can the DirectOutputCommitter be utilized for writing ORC files?
I tried setting it via:

sc.getConf.set("spark.hadoop.mapred.output.committer.class","com.veracity-group.datastage.directorcwriter")

But I can still see a _temporary  directory being used when I save my
dataframe as ORC.

Thank you,
Daniel


Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread Mich Talebzadeh
Hi,


I am doing some tests

I have started Spark in Standalone mode.

For simplicity I am using one node only with 8 works and I have 12 cores

In spark-env.sh I set this

# Options for the daemons used in the standalone deploy mode
export SPARK_WORKER_CORES=1 ##, total number of cores to be used by
executors by each worker
export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers have
to give executors (e.g. 1000m, 2g)
the worker
export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker processes
per node

So it is pretty straight forward with 8 works and each worker assigned one
core

jps|grep Worker
15297 Worker
14794 Worker
15374 Worker
14998 Worker
15198 Worker
15465 Worker
14897 Worker
15099 Worker

I start Spark Thrift Server with the following parameters (using standalone
mode)

${SPARK_HOME}/sbin/start-thriftserver.sh \
--master spark://50.140.197.217:7077 \
--hiveconf hive.server2.thrift.port=10055 \
--driver-memory 1G \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 1G \
--conf "spark.scheduler.mode=FIFO" \

With one executor allocated 1 core

However, I can see both in the OS and UI that it starts with 8 executors,
the same number of workers on this node!

jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
32711 SparkSubmit
369 CoarseGrainedExecutorBackend
370 CoarseGrainedExecutorBackend
371 CoarseGrainedExecutorBackend
376 CoarseGrainedExecutorBackend
387 CoarseGrainedExecutorBackend
395 CoarseGrainedExecutorBackend
419 CoarseGrainedExecutorBackend
420 CoarseGrainedExecutorBackend


I fail to see why this is happening. Nothing else is running Spark wise.
The cause?

 How can I stop STS going and using all available workers?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: jdbcRRD and dataframe

2016-07-25 Thread Marco Colombo
Thanks, I would submit an improvement

Il lunedì 25 luglio 2016, Mich Talebzadeh  ha
scritto:

> I don't think there is.
>
> it would be a viable request using collection pool through DF to connect
> to an RDBMS
>
> cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 July 2016 at 16:07, Marco Colombo  > wrote:
>
>> From getConnection I'm handling a connection pool.
>> I see no option for that in docs
>>
>> Regards
>>
>>
>> Il lunedì 25 luglio 2016, Mich Talebzadeh > > ha scritto:
>>
>>> Hi Marco,
>>>
>>> what is in your UDF getConnection and why not use DF itself?
>>>
>>> I guess it is all connection attributes
>>>
>>> val c = HiveContext.load("jdbc",
>>> Map("url" -> _ORACLEserver,
>>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>>> FROM sh.channels)",
>>> "user" -> _username,
>>> "password" -> _password))
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 25 July 2016 at 15:14, Marco Colombo 
>>> wrote:
>>>

 Hi all,

 I was using JdbcRRD and signature for constructure was accepting a 
 function to get a DB connection. This is very useful to provide my own 
 connection handler.

 I'm valuating to move to daraframe, but I cannot how to provide such 
 function and migrate my code. I want to use my own 'getConnection' rather 
 than provide connection details.

 JdbcRDD(SparkContext sc,
scala.Function0 getConnection,
.,
 to
  val df: DataFrame = 
 hiveSqlContext.read.format("jdbc").options(options).load();

 How this can be achieved?

 Thanks!


>>>
>>
>> --
>> Ing. Marco Colombo
>>
>
>

-- 
Ing. Marco Colombo


get hdfs file path in spark

2016-07-25 Thread Yang Cao
Hi,
To be new here, I hope to get assistant from you guys. I wonder whether I have 
some elegant way to get some directory under some path. For example, I have a 
path like on hfs /a/b/c/d/e/f, and I am given a/b/c, is there any straight 
forward way to get the path /a/b/c/d/e . I think I can do it with the help of 
regex. But I still hope to find whether there is easier way that make my code 
cleaner. My evn: spark 1.6, language: Scala


Thx
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: jdbcRRD and dataframe

2016-07-25 Thread Mich Talebzadeh
I don't think there is.

it would be a viable request using collection pool through DF to connect to
an RDBMS

cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 16:07, Marco Colombo  wrote:

> From getConnection I'm handling a connection pool.
> I see no option for that in docs
>
> Regards
>
>
> Il lunedì 25 luglio 2016, Mich Talebzadeh  ha
> scritto:
>
>> Hi Marco,
>>
>> what is in your UDF getConnection and why not use DF itself?
>>
>> I guess it is all connection attributes
>>
>> val c = HiveContext.load("jdbc",
>> Map("url" -> _ORACLEserver,
>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>> FROM sh.channels)",
>> "user" -> _username,
>> "password" -> _password))
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 25 July 2016 at 15:14, Marco Colombo 
>> wrote:
>>
>>>
>>> Hi all,
>>>
>>> I was using JdbcRRD and signature for constructure was accepting a function 
>>> to get a DB connection. This is very useful to provide my own connection 
>>> handler.
>>>
>>> I'm valuating to move to daraframe, but I cannot how to provide such 
>>> function and migrate my code. I want to use my own 'getConnection' rather 
>>> than provide connection details.
>>>
>>> JdbcRDD(SparkContext sc,
>>>scala.Function0 getConnection,
>>>.,
>>> to
>>>  val df: DataFrame = 
>>> hiveSqlContext.read.format("jdbc").options(options).load();
>>>
>>> How this can be achieved?
>>>
>>> Thanks!
>>>
>>>
>>
>
> --
> Ing. Marco Colombo
>


Re: jdbcRRD and dataframe

2016-07-25 Thread Marco Colombo
>From getConnection I'm handling a connection pool.
I see no option for that in docs

Regards

Il lunedì 25 luglio 2016, Mich Talebzadeh  ha
scritto:

> Hi Marco,
>
> what is in your UDF getConnection and why not use DF itself?
>
> I guess it is all connection attributes
>
> val c = HiveContext.load("jdbc",
> Map("url" -> _ORACLEserver,
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
> "user" -> _username,
> "password" -> _password))
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 July 2016 at 15:14, Marco Colombo  > wrote:
>
>>
>> Hi all,
>>
>> I was using JdbcRRD and signature for constructure was accepting a function 
>> to get a DB connection. This is very useful to provide my own connection 
>> handler.
>>
>> I'm valuating to move to daraframe, but I cannot how to provide such 
>> function and migrate my code. I want to use my own 'getConnection' rather 
>> than provide connection details.
>>
>> JdbcRDD(SparkContext sc,
>>scala.Function0 getConnection,
>>.,
>> to
>>  val df: DataFrame = 
>> hiveSqlContext.read.format("jdbc").options(options).load();
>>
>> How this can be achieved?
>>
>> Thanks!
>>
>>
>

-- 
Ing. Marco Colombo


Re: jdbcRRD and dataframe

2016-07-25 Thread Mich Talebzadeh
Hi Marco,

what is in your UDF getConnection and why not use DF itself?

I guess it is all connection attributes

val c = HiveContext.load("jdbc",
Map("url" -> _ORACLEserver,
"dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
sh.channels)",
"user" -> _username,
"password" -> _password))

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 25 July 2016 at 15:14, Marco Colombo  wrote:

>
> Hi all,
>
> I was using JdbcRRD and signature for constructure was accepting a function 
> to get a DB connection. This is very useful to provide my own connection 
> handler.
>
> I'm valuating to move to daraframe, but I cannot how to provide such function 
> and migrate my code. I want to use my own 'getConnection' rather than provide 
> connection details.
>
> JdbcRDD(SparkContext sc,
>scala.Function0 getConnection,
>.,
> to
>  val df: DataFrame = 
> hiveSqlContext.read.format("jdbc").options(options).load();
>
> How this can be achieved?
>
> Thanks!
>
>


Pls assist: Creating Spak EC2 cluster using spark_ec2.py script and a custom AMI

2016-07-25 Thread Marco Mistroni
HI all
 i was wondering if anyone can help with this
I Have created a spark cluster before using spark_ec2.py script from Spark
1.6.1
that by default uses a very old AMI... so i decided to try to launch the
script with a more up to date
AMI.
the one i have used is ami-d732f0b7, which refers to Ubuntu Server 14.04
LTS (HVM), SSD Volume Type



I have lauched the script as follows

./spark-ec2  --key-pair=ec2AccessKey --identity-file ec2AccessKey.pem
--region=us-west-2 --ami ami-d732f0b7 launch my-spark-cluster

but i am gettign this exception:

Non-Windows instances with a virtualization type of 'hvm' are currently not
supported for this instance type.

which seems a bizarre exception to me as , in spark_ec2.py , instance
m1.large (the one used to create spark master and nodes) is associated with
vritualization=pvm

"m1.large":"pvm"


has anyone found similar issue? any suggestion on how can i use a custom
AMI when creating a spark cluster?

kind regards
 marco


Re: read parquetfile in spark-sql error

2016-07-25 Thread Kabeer Ahmed
I hope the below sample helps you:

val parquetDF = hiveContext.read.parquet("hdfs://.parquet")
parquetDF.registerTempTable("parquetTable")
sql("SELECT * FROM parquetTable").collect().foreach(println)

Kabeer.
Sent from
 Nylas 
N1,
 the extensible, open source mail client.

On Jul 25 2016, at 12:09 pm, cj <124411...@qq.com> wrote:
hi,all:

  I use spark1.6.1 as my work env.

  when I saved the following content as test1.sql file :


CREATE TEMPORARY TABLE parquetTable

USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

and use bin/spark-sql to run it 
(/home/bae/dataplatform/spark-1.6.1/bin/spark-sql  --properties-file 
./spark-dataplatform.conf -f test1.sql ),I encountered a grammar error.


SET hive.support.sql11.reserved.keywords=false
SET spark.sql.hive.version=1.2.1
SET spark.sql.hive.version=1.2.1
NoViableAltException(280@[192:1: tableName : (db= identifier DOT tab= 
identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME $tab) 
);])
at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
at org.antlr.runtime.DFA.predict(DFA.java:116)
at 
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4747)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:45918)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:5029)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2640)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1650)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1109)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:290)
at 
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:237)
at 
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:236)
at 
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:279)
at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 

jdbcRRD and dataframe

2016-07-25 Thread Marco Colombo
Hi all,

I was using JdbcRRD and signature for constructure was accepting a
function to get a DB connection. This is very useful to provide my own
connection handler.

I'm valuating to move to daraframe, but I cannot how to provide such
function and migrate my code. I want to use my own 'getConnection'
rather than provide connection details.

JdbcRDD(SparkContext sc,
   scala.Function0 getConnection,
   .,
to
 val df: DataFrame = hiveSqlContext.read.format("jdbc").options(options).load();

How this can be achieved?

Thanks!


unsubscribe

2016-07-25 Thread milad bourhani



Re: add spark-csv jar to ipython notbook without packages flags

2016-07-25 Thread Ndjido Ardo BAR
Hi Pseudo,

try this :

export SPARK_SUBMIT_OPTIONS  =  "--jars spark-csv_2.10-1.4.0.jar,
commons-csv-1.1.jar"

this have been working for me for a longtime ;-) both in Zeppelin(for Spark
Scala)  and Ipython Notebook (for PySpark).

cheers,

Ardo



On Mon, Jul 25, 2016 at 1:28 PM, pseudo oduesp 
wrote:

> PYSPARK_SUBMIT_ARGS  =  --jars spark-csv_2.10-1.4.0.jar,commons-csv-1.1.jar
> without succecs
>
> thanks
>
>
> 2016-07-25 13:27 GMT+02:00 pseudo oduesp :
>
>> Hi ,
>>  someone can telle me how i can add jars to ipython  i try spark
>>
>>
>>
>


Re: unsubscribe)

2016-07-25 Thread Daniel Lopes
Hi Uzi,

To unsubscribe e-mail: user-unsubscr...@spark.apache.org

*Daniel Lopes*
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br


On Mon, Jul 25, 2016 at 2:36 AM, Uzi Hadad  wrote:

>
>


SPARK SQL and join pipeline issue

2016-07-25 Thread Carlo . Allocca
Dear All,

I have the following question:

I am using SPARK SQL 2.0 version and, in particular I am doing some joins in 
pipeline of the following pattern (d3 = d1 join d2, d4=d5 join d6, d7=d3 join 
d4).

When running my code, I realised that the building of d7 generates an issue as 
reported below.

The typical code that I am using for building a join is:

// We make a join of the first two above
   Dataset d3 = d1
   .join(d2,
   (d2.col("PRD_asin#2").equalTo(d1.col("PRD_asin#1")), 
"inner");

Is there something that I am doing wrong?

Please, any help would be very appreciated.

Thank you in advance.

Best Regards,
Carlo




 ISSUE TRACK:

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 319.666 sec <<< 
FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 319.265 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.ExpandExec.consume(ExpandExec.scala:36)
at org.apache.spark.sql.execution.ExpandExec.doConsume(ExpandExec.scala:198)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:146)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:211)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:146)
at 
org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 

Re: add spark-csv jar to ipython notbook without packages flags

2016-07-25 Thread pseudo oduesp
PYSPARK_SUBMIT_ARGS  =  --jars spark-csv_2.10-1.4.0.jar,commons-csv-1.1.jar
without succecs

thanks


2016-07-25 13:27 GMT+02:00 pseudo oduesp :

> Hi ,
>  someone can telle me how i can add jars to ipython  i try spark
>
>
>


add spark-csv jar to ipython notbook without packages flags

2016-07-25 Thread pseudo oduesp
Hi ,
 someone can telle me how i can add jars to ipython  i try spark


Re: Spark, Scala, and DNA sequencing

2016-07-25 Thread James McCabe

Thanks Ofir and Sean,

I'm aware of AMPLab's ADAM. Spark is bringing down the cost of genome 
analysis to consumer level. The potential for the future of medicine is 
exciting indeed.


I normally do Scala consulting which keeps me too busy, but recently I 
finally got some spare time to look into interesting open-source 
projects like this.


James


On 24/07/16 09:09, Sean Owen wrote:

Also also, you may be interested in GATK, built on Spark, for genomics:
https://github.com/broadinstitute/gatk


On Sun, Jul 24, 2016 at 7:56 AM, Ofir Manor  wrote:

Hi James,
BTW - if you are into analyzing DNA with Spark, you may also be interested
in ADAM:
https://github.com/bigdatagenomics/adam
 http://bdgenomics.org/

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io


On Fri, Jul 22, 2016 at 10:31 PM, James McCabe  wrote:

Hi!

I hope this may be of use/interest to someone:

Spark, a Worked Example: Speeding Up DNA Sequencing


http://scala-bility.blogspot.nl/2016/07/spark-worked-example-speeding-up-dna.html

James


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



read parquetfile in spark-sql error

2016-07-25 Thread cj
hi,all:


  I use spark1.6.1 as my work env.
  
  when I saved the following content as test1.sql file :
  
CREATE TEMPORARY TABLE parquetTableUSING org.apache.spark.sql.parquet OPTIONS ( 
  path "examples/src/main/resources/people.parquet" ) SELECT * FROM parquetTable

and use bin/spark-sql to run it 
(/home/bae/dataplatform/spark-1.6.1/bin/spark-sql  --properties-file 
./spark-dataplatform.conf -f test1.sql ),I encountered a grammar error.




SET hive.support.sql11.reserved.keywords=false
SET spark.sql.hive.version=1.2.1
SET spark.sql.hive.version=1.2.1
NoViableAltException(280@[192:1: tableName : (db= identifier DOT tab= 
identifier -> ^( TOK_TABNAME $db $tab) |tab= identifier -> ^( TOK_TABNAME $tab) 
);])
at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
at org.antlr.runtime.DFA.predict(DFA.java:116)
at 
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.tableName(HiveParser_FromClauseParser.java:4747)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.tableName(HiveParser.java:45918)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.createTableStatement(HiveParser.java:5029)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:2640)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1650)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1109)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:202)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:276)
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:290)
at 
org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:237)
at 
org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:236)
at 
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:279)
at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
at 
org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at 
org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 

Fwd: PySpark : Filter based on resultant query without additional dataframe

2016-07-25 Thread kiran kumar
Hi All,

I am trying out with spark for the first time, so am reaching out for what
would seem as very basic question.

Consider the below example

>>> l = 
>>> [("US","City1",125),("US","City2",123),("Europe","CityX",23),("Europe","CityY",17)]
>>> print l
[('US', 'City1', 125), ('US', 'City2', 123), ('Europe', 'CityX', 23),
('Europe', 'CityY', 17)]

>>> sc = SparkContext(appName="N")
>>> sqlsc = SQLContext(sc)
>>> df = sqlsc.createDataFrame(l)
>>> df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
>>> df.registerTempTable("t1")
>>> rdf=sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show()
+--+---+
|_1|_c1|
+--+---+
|US|248|
|Europe| 40|
+--+---+
>>> rdf.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _c1: long (nullable = true)
>>> rdf.registerTempTable("t2")
>>> sqlsc.sql("Select * from t2 where _c1 > 200").show()
+---+---+
| _1|_c1|
+---+---+
| US|248|
+---+---+

So basically, I am trying to find all the _3 (which can be population
subscribed to some service) which are above threshold in each country. In
the above table, there is an additional dataframe is created (rdf)

Now, How do I eliminate the rdf dataframe and embed the complete query
within df dataframe itself.

I tried, but pyspark throws error

>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show()
+--+---+
|_1|_c1|
+--+---+
|US|248|
|Europe| 40|
+--+---+
>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1 where _c1 > 200").show()
Traceback (most recent call last):
 File 
"/ghostcache/kimanjun/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py",
line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o28.sql.
: java.lang.RuntimeException: [1.39] failure: ``union'' expected but
`where' found

Question:

Is there a possible way to avoid creation of the data frame (rdf) and
directly get the result from df?

I have not put much thought on how it would be beneficial, but just
pondering the question.
-- 
Regards
Kiran



-- 
Regards
Kiran


Re: Spark ml.ALS question -- RegressionEvaluator .evaluate giving ~1.5 output for same train and predict data

2016-07-25 Thread Rohit Chaddha
Hi Krishna,

Great .. I had no idea about this.  I tried your suggestion by using
na.drop() and got a rmse = 1.5794048211812495
Any suggestions how this can be reduced and the model improved ?

Regards,
Rohit

On Mon, Jul 25, 2016 at 4:12 AM, Krishna Sankar  wrote:

> Thanks Nick. I also ran into this issue.
> VG, One workaround is to drop the NaN from predictions (df.na.drop()) and
> then use the dataset for the evaluator. In real life, probably detect the
> NaN and recommend most popular on some window.
> HTH.
> Cheers
> 
>
> On Sun, Jul 24, 2016 at 12:49 PM, Nick Pentreath  > wrote:
>
>> It seems likely that you're running into
>> https://issues.apache.org/jira/browse/SPARK-14489 - this occurs when the
>> test dataset in the train/test split contains users or items that were not
>> in the training set. Hence the model doesn't have computed factors for
>> those ids, and ALS 'transform' currently returns NaN for those ids. This in
>> turn results in NaN for the evaluator result.
>>
>> I have a PR open on that issue that will hopefully address this soon.
>>
>>
>> On Sun, 24 Jul 2016 at 17:49 VG  wrote:
>>
>>> ping. Anyone has some suggestions/advice for me .
>>> It will be really helpful.
>>>
>>> VG
>>> On Sun, Jul 24, 2016 at 12:19 AM, VG  wrote:
>>>
 Sean,

 I did this just to test the model. When I do a split of my data as
 training to 80% and test to be 20%

 I get a Root-mean-square error = NaN

 So I am wondering where I might be going wrong

 Regards,
 VG

 On Sun, Jul 24, 2016 at 12:12 AM, Sean Owen  wrote:

> No, that's certainly not to be expected. ALS works by computing a much
> lower-rank representation of the input. It would not reproduce the
> input exactly, and you don't want it to -- this would be seriously
> overfit. This is why in general you don't evaluate a model on the
> training set.
>
> On Sat, Jul 23, 2016 at 7:37 PM, VG  wrote:
> > I am trying to run ml.ALS to compute some recommendations.
> >
> > Just to test I am using the same dataset for training using ALSModel
> and for
> > predicting the results based on the model .
> >
> > When I evaluate the result using RegressionEvaluator I get a
> > Root-mean-square error = 1.5544064263236066
> >
> > I thin this should be 0. Any suggestions what might be going wrong.
> >
> > Regards,
> > Vipul
>


>


spark2.0 can't run SqlNetworkWordCount

2016-07-25 Thread kevin
hi,all:
I download spark2.0 per-build. I can run SqlNetworkWordCount test use :
bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount
master1 

but when I use spark2.0 example source code SqlNetworkWordCount.scala and
build it to a jar bao with dependencies ( JDK 1.8 AND SCALA2.10)
when I use spark-submit to run it I got error:

16/07/25 17:28:30 INFO scheduler.JobScheduler: Starting job streaming job
146943891 ms.0 from job set of time 146943891 ms
Exception in thread "streaming-job-executor-2" java.lang.NoSuchMethodError:
scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
at
main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:67)
at
main.SqlNetworkWordCount$$anonfun$main$1.apply(SqlNetworkWordCount.scala:61)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Marco Mistroni
Hi Kevin
  you should not need to rebuild everything.
Instead, i believe you should launch spark-submit by specifying the kafka
jar file in your --packages... i had to follow same when integrating spark
streaming with flume

  have you checked this link ?
https://spark.apache.org/docs/latest/streaming-kafka-integration.html


hth



On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:

> I have compile it from source code
>
> 2016-07-25 12:05 GMT+08:00 kevin :
>
>> hi,all :
>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>> I got error :
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtils$
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 11 more
>>
>> so where I can find spark-streaming-kafka for spark2.0
>>
>
>


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
I have compile it from source code

2016-07-25 12:05 GMT+08:00 kevin :

> hi,all :
> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
> I got error :
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
> at
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> at
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaUtils$
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 11 more
>
> so where I can find spark-streaming-kafka for spark2.0
>


Re: Spark 1.6.2 version displayed as 1.6.1

2016-07-25 Thread Daniel Darabos
Another possible explanation is that by accident you are still running
Spark 1.6.1. Which download are you using? This is what I see:

$ ~/spark-1.6.2-bin-hadoop2.6/bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's repl log4j profile:
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
  /_/


On Mon, Jul 25, 2016 at 7:45 AM, Sean Owen  wrote:

> Are you certain? looks like it was correct in the release:
>
>
> https://github.com/apache/spark/blob/v1.6.2/core/src/main/scala/org/apache/spark/package.scala
>
>
>
> On Mon, Jul 25, 2016 at 12:33 AM, Ascot Moss  wrote:
> > Hi,
> >
> > I am trying to upgrade spark from 1.6.1 to 1.6.2, from 1.6.2
> spark-shell, I
> > found the version is still displayed 1.6.1
> >
> > Is this a minor typo/bug?
> >
> > Regards
> >
> >
> >
> > ###
> >
> > Welcome to
> >
> >     __
> >
> >  / __/__  ___ _/ /__
> >
> > _\ \/ _ \/ _ `/ __/  '_/
> >
> >/___/ .__/\_,_/_/ /_/\_\   version 1.6.1
> >
> >   /_/
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Maintaining order of pair rdd

2016-07-25 Thread Marco Mistroni
Hi
 after you do a groupBy you should use a sortWith.
Basically , a groupBy reduces your structure to (anyone correct me if i m
wrong) a RDD[(key,val)], which you can see as a tuple.so you could use
sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
hth

On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty 
wrote:

> Thanks Marco. This solved the order problem. Had another question which is
> prefix to this.
>
> As you can see below ID2,ID1 and ID3 are in order and I need to maintain
> this index order as well. But when we do groupByKey 
> operation(*rdd.distinct.groupByKey().mapValues(v
> => v.toArray*))
> everything is *jumbled*.
> Is there any way we can maintain this order as well ?
>
> scala> RDD.foreach(println)
> (ID2,18159)
> (ID1,18159)
> (ID3,18159)
>
> (ID2,18159)
> (ID1,18159)
> (ID3,18159)
>
> (ID2,36318)
> (ID1,36318)
> (ID3,36318)
>
> (ID2,54477)
> (ID1,54477)
> (ID3,54477)
>
> *Jumbled version : *
> Array(
> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*, 145272,
> 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
> 45431, 100136)),
> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022, 39244,
> 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*, 308703,
> 160992, 45431, 162076)),
> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
> 45431, *36318*, 162076))
> )
>
> *Expected output:*
> Array(
> (ID1,Array(*18159*,*36318*, *54477,...*)),
> (ID3,Array(*18159*,*36318*, *54477, ...*)),
> (ID2,Array(*18159*,*36318*, *54477, ...*))
> )
>
> As you can see after *groupbyKey* operation is complete item 18519 is in
> index 0 for ID1, index 2 for ID3 and index 16 for ID2 where as expected is
> index 0
>
>
> On Sun, Jul 24, 2016 at 12:43 PM, Marco Mistroni 
> wrote:
>
>> Hello
>>  Uhm you have an array containing 3 tuples?
>> If all the arrays have same length, you can just zip all of them,
>> creatings a list of tuples
>> then you can scan the list 5 by 5...?
>>
>> so something like
>>
>> (Array(0)_2,Array(1)._2,Array(2)._2).zipped.toList
>>
>> this will give you a list of tuples of 3 elements containing each items
>> from ID1, ID2 and ID3  ... sample below
>> res: List((18159,100079,308703), (308703, 19622, 54477), (72636,18159,
>> 89366)..)
>>
>> then you can use a recursive function to compare each element such as
>>
>> def iterate(lst:List[(Int, Int, Int)]):T = {
>> if (lst.isEmpty): /// return your comparison
>> else {
>>  val splits = lst.splitAt(5)
>>  // do sometjhing about it using splits._1
>>  iterate(splits._2)
>>}
>>
>> will this help? or am i still missing something?
>>
>> kr
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On 24 Jul 2016 5:52 pm, "janardhan shetty" 
>> wrote:
>>
>>> Array(
>>> (ID1,Array(18159, 308703, 72636, 64544, 39244, 107937, 54477, 145272,
>>> 100079, 36318, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
>>> 45431, 100136)),
>>> (ID3,Array(100079, 19622, 18159, 212064, 107937, 44683, 150022, 39244,
>>> 100136, 58866, 72636, 145272, 817, 89366, 54477, 36318, 308703, 160992,
>>> 45431, 162076)),
>>> (ID2,Array(308703, 54477, 89366, 39244, 150022, 72636, 817, 58866,
>>> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, 18159, 45431,
>>> 36318, 162076))
>>> )
>>>
>>> I need to compare first 5 elements of ID1 with first five element of
>>> ID3  next first 5 elements of ID1 to ID2. Similarly next 5 elements in that
>>> order until the end of number of elements.
>>> Let me know if this helps
>>>
>>>
>>> On Sun, Jul 24, 2016 at 7:45 AM, Marco Mistroni 
>>> wrote:
>>>
 Apologies I misinterpreted could you post two use cases?
 Kr

 On 24 Jul 2016 3:41 pm, "janardhan shetty" 
 wrote:

> Marco,
>
> Thanks for the response. It is indexed order and not ascending or
> descending order.
> On Jul 24, 2016 7:37 AM, "Marco Mistroni"  wrote:
>
>> Use map values to transform to an rdd where values are sorted?
>> Hth
>>
>> On 24 Jul 2016 6:23 am, "janardhan shetty" 
>> wrote:
>>
>>> I have a key,value pair rdd where value is an array of Ints. I need
>>> to maintain the order of the value in order to execute downstream
>>> modifications. How do we maintain the order of values?
>>> Ex:
>>> rdd = (id1,[5,2,3,15],
>>> Id2,[9,4,2,5])
>>>
>>> Followup question how do we compare between one element in rdd with
>>> all other elements ?
>>>
>>
>>>
>


Re: Hive and distributed sql engine

2016-07-25 Thread Marco Colombo
Thanks. That what I was thinking.
But how to setup connection per worker?


Il lunedì 25 luglio 2016, ayan guha  ha scritto:

> In order to use existing pg UDF, you may create a view in pg and expose
> the view to hive.
> Spark to database connection happens from each executors, so you must have
> a connection or a pool of connection per worker. Executors of the same
> worker can share connection pool.
>
> Best
> Ayan
> On 25 Jul 2016 16:48, "Marco Colombo"  > wrote:
>
>> Hi all!
>> Among other use cases, I want to use spark as a distributed sql engine
>> via thrift server.
>> I have some tables in postegres and Cassandra: I need to expose them via
>> hive for custom reporting.
>> Basic implementation is simple and works, but I have some concerns and
>> open question:
>> - is there a better approach rather than mapping a temp table as a select
>> of the full table?
>> - What about query setup cost? I mean, is there a way to avoid db
>> connection setup costs using a pre-created connection pool?
>> - is it possibile from hiveql to use functions defined in the pg database
>> or should I have to rewrite them as udaf?
>>
>> Thanks!
>>
>>
>>
>> --
>> Ing. Marco Colombo
>>
>

-- 
Ing. Marco Colombo


Re: Hive and distributed sql engine

2016-07-25 Thread ayan guha
In order to use existing pg UDF, you may create a view in pg and expose the
view to hive.
Spark to database connection happens from each executors, so you must have
a connection or a pool of connection per worker. Executors of the same
worker can share connection pool.

Best
Ayan
On 25 Jul 2016 16:48, "Marco Colombo"  wrote:

> Hi all!
> Among other use cases, I want to use spark as a distributed sql engine
> via thrift server.
> I have some tables in postegres and Cassandra: I need to expose them via
> hive for custom reporting.
> Basic implementation is simple and works, but I have some concerns and
> open question:
> - is there a better approach rather than mapping a temp table as a select
> of the full table?
> - What about query setup cost? I mean, is there a way to avoid db
> connection setup costs using a pre-created connection pool?
> - is it possibile from hiveql to use functions defined in the pg database
> or should I have to rewrite them as udaf?
>
> Thanks!
>
>
>
> --
> Ing. Marco Colombo
>


Re: java.lang.RuntimeException: Unsupported type: vector

2016-07-25 Thread Hyukjin Kwon
I just wonder how your CSV data structure looks like.

If my understanding is correct, is SQL type of the VectorUDT is StructType
and CSV data source does not support ArrayType and StructType.

Anyhow, it seems CSV does not support UDT for now anyway.

https://github.com/apache/spark/blob/e1dc853737fc1739fbb5377ffe31fb2d89935b1f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L241-L293



2016-07-25 1:50 GMT+09:00 Jean Georges Perrin :

> I try to build a simple DataFrame that can be used for ML
>
>
> SparkConf conf = new SparkConf().setAppName("Simple prediction from Text
> File").setMaster("local");
> SparkContext sc = new SparkContext(conf);
> SQLContext sqlContext = new SQLContext(sc);
>
> sqlContext.udf().register("vectorBuilder", new VectorBuilder(), new
> VectorUDT());
>
> String filename = "data/tuple-data-file.csv";
> StructType schema = new StructType(
> new StructField[] { new StructField("C0", DataTypes.StringType, false,
> Metadata.empty()),
> new StructField("C1", DataTypes.IntegerType, false, Metadata.empty()),
> new StructField("features", new VectorUDT(), false, Metadata.empty()), });
>
> DataFrame df = sqlContext.read().format("com.databricks.spark.csv"
> ).schema(schema).option("header", "false")
> .load(filename);
> df = df.withColumn("label", df.col("C0")).drop("C0");
> df = df.withColumn("value", df.col("C1")).drop("C1");
> df.printSchema();
> Returns:
> root
>  |-- features: vector (nullable = false)
>  |-- label: string (nullable = false)
>  |-- value: integer (nullable = false)
> df.show();
> Returns:
>
> java.lang.RuntimeException: Unsupported type: vector
> at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:76)
> at
> com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:194)
> at
> com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:173)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
> at scala.collection.AbstractIterator.to(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/07/24 12:46:01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.RuntimeException: Unsupported type: vector
> at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:76)
> at
> com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:194)
> at
> com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:173)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> 

Hive and distributed sql engine

2016-07-25 Thread Marco Colombo
Hi all!
Among other use cases, I want to use spark as a distributed sql engine
via thrift server.
I have some tables in postegres and Cassandra: I need to expose them via
hive for custom reporting.
Basic implementation is simple and works, but I have some concerns and open
question:
- is there a better approach rather than mapping a temp table as a select
of the full table?
- What about query setup cost? I mean, is there a way to avoid db
connection setup costs using a pre-created connection pool?
- is it possibile from hiveql to use functions defined in the pg database
or should I have to rewrite them as udaf?

Thanks!



-- 
Ing. Marco Colombo