Consuming parquet files built with version 1.8.1

2016-09-06 Thread Dinesh Narayanan
Hello,
I have some parquet files generated with 1.8.1 through an MR job that i
need to consume. I see that master is built with parquet 1.8.1 but i get
this error(with master branch)

java.lang.NoSuchMethodError:
org.apache.parquet.schema.Types$MessageTypeBuilder.addFields([Lorg/apache/parquet/schema/Type;)Lorg/apache/parquet/schema/Types$GroupBuilder;
at
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport$.clipParquetSchema(ParquetReadSupport.scala:114)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.init(ParquetReadSupport.scala:67)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:136)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:360)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:339)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Do you think i am missing something here or is this is a potential bug? Any
workarounds to use parquet files with version PARQUET_2_0

Thanks
Dinesh


Re: Spark 2.0.0 Thrift Server problem with Hive metastore

2016-09-06 Thread Chanh Le
Did anyone use STS of Spark 2.0 on production?
For me I still waiting for the compatible in parquet file created by Spark 
1.6.1 


> On Sep 6, 2016, at 2:46 PM, Campagnola, Francesco 
>  wrote:
> 
> I mean I have installed Spark 2.0 in the same environment where Spark 1.6 
> thrift server was running,
> then stopped the Spark 1.6 thrift server and started the Spark 2.0 one.
>  
> If I’m not mistaken, Spark 2.0 should be still compatible with Hive 1.2.1 and 
> no upgrade procedures are required.
> The spark-defaults.conf file has not been changed.
>  
> The following commands issued to the Spark 2.0 thrift server work:
> create database test;
> use test;
> create table tb_1 (id int);
> insert into table tb_1 select t.id from (select 1 as id) t;
>  
> While all of these commands return the same error:
> show databases;
> show tables;
> show partitions tb_1;
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 62.0 failed 10 times, most recent failure: Lost task 0.9 in 
> stage 62.0 (TID 540, vertica204): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
>  
>  
>  
>  
> From: Jeff Zhang [mailto:zjf...@gmail.com ] 
> Sent: martedì 6 settembre 2016 02:50
> To: Campagnola, Francesco  >
> Cc: user@spark.apache.org 
> Subject: Re: Spark 2.0.0 Thrift Server problem with Hive metastore
>  
> How do you upgrade to spark 2.0 ? 
>  
> On Mon, Sep 5, 2016 at 11:25 PM, Campagnola, Francesco 
> > 
> wrote:
> Hi,
>  
> in an already working Spark - Hive environment with Spark 1.6 and Hive 1.2.1, 
> with Hive metastore configured on Postgres DB, I have upgraded Spark to the 
> 2.0.0.
>  
> I have started the thrift server on YARN, then tried to execute from the 
> beeline cli or a jdbc client the following command:
> SHOW DATABASES;
> It always gives this error on Spark server side:
>  
> spark@spark-test[spark] /home/spark> beeline -u 
> jdbc:hive2://$(hostname):1 -n spark
>  
> Connecting to jdbc:hive2://spark-test:1
> 16/09/05 17:41:43 INFO jdbc.Utils: Supplied authorities: spark-test:1
> 16/09/05 17:41:43 INFO jdbc.Utils: Resolved authority: spark-test:1
> 16/09/05 17:41:43 INFO jdbc.HiveConnection: Will try to open client transport 
> with JDBC Uri: jdbc:hive2:// spark-test:1
> Connected to: Spark SQL (version 2.0.0)
> Driver: Hive JDBC (version 1.2.1.spark2)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.2.1.spark2 by Apache Hive
>  
> 0: jdbc:hive2:// spark-test:1> show databases;
> java.lang.IllegalStateException: Can't overwrite cause with 
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> at java.lang.Throwable.initCause(Throwable.java:457)
> at 
> org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
> at 
> org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
> at 
> org.apache.hive.service.cli.HiveSQLException.toCause(HiveSQLException.java:197)
> at 
> org.apache.hive.service.cli.HiveSQLException.(HiveSQLException.java:108)
> at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:256)
> at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:242)
> at 
> org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:365)
> at org.apache.hive.beeline.BufferedRows.(BufferedRows.java:42)
> at org.apache.hive.beeline.BeeLine.print(BeeLine.java:1794)
> at org.apache.hive.beeline.Commands.execute(Commands.java:860)
> at org.apache.hive.beeline.Commands.sql(Commands.java:713)
> at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:973)
> at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:813)
> at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:771)
> at 
> org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:484)
> at org.apache.hive.beeline.BeeLine.main(BeeLine.java:467)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 3.0 failed 10 times, most recent failure: Lost task 0.9 in 
> stage 3.0 (TID 12, vertica204): java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast 
> to org.apache.spark.sql.catalyst.expressions.UnsafeRow
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>  

Dataframe, Java: How to convert String to Vector ?

2016-09-06 Thread Yan Facai
Hi,
I have a csv file like:
uid  mid  features   label
1235231[0, 1, 3, ...]True

Both  "features" and "label" columns are used for GBTClassifier.

However, when I read the file:
Dataset samples = sparkSession.read().csv(file);
The type of samples.select("features") is String.

My question is:
How to map samples.select("features") to Vector or any appropriate type,
so I can use it to train like:
GBTClassifier gbdt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(2)
.setMaxDepth(7);

Thanks.


RE: Spark 2.0.0 Thrift Server problem with Hive metastore

2016-09-06 Thread Campagnola, Francesco
I mean I have installed Spark 2.0 in the same environment where Spark 1.6 
thrift server was running,
then stopped the Spark 1.6 thrift server and started the Spark 2.0 one.

If I’m not mistaken, Spark 2.0 should be still compatible with Hive 1.2.1 and 
no upgrade procedures are required.
The spark-defaults.conf file has not been changed.

The following commands issued to the Spark 2.0 thrift server work:
create database test;
use test;
create table tb_1 (id int);
insert into table tb_1 select t.id from (select 1 as id) t;

While all of these commands return the same error:
show databases;
show tables;
show partitions tb_1;
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 62.0 failed 10 times, most recent failure: Lost task 0.9 in 
stage 62.0 (TID 540, vertica204): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow




From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: martedì 6 settembre 2016 02:50
To: Campagnola, Francesco 
Cc: user@spark.apache.org
Subject: Re: Spark 2.0.0 Thrift Server problem with Hive metastore

How do you upgrade to spark 2.0 ?

On Mon, Sep 5, 2016 at 11:25 PM, Campagnola, Francesco 
> 
wrote:
Hi,

in an already working Spark - Hive environment with Spark 1.6 and Hive 1.2.1, 
with Hive metastore configured on Postgres DB, I have upgraded Spark to the 
2.0.0.

I have started the thrift server on YARN, then tried to execute from the 
beeline cli or a jdbc client the following command:
SHOW DATABASES;
It always gives this error on Spark server side:

spark@spark-test[spark] /home/spark> beeline -u jdbc:hive2://$(hostname):1 
-n spark

Connecting to jdbc:hive2://spark-test:1
16/09/05 17:41:43 INFO jdbc.Utils: Supplied authorities: spark-test:1
16/09/05 17:41:43 INFO jdbc.Utils: Resolved authority: spark-test:1
16/09/05 17:41:43 INFO jdbc.HiveConnection: Will try to open client transport 
with JDBC Uri: jdbc:hive2:// spark-test:1
Connected to: Spark SQL (version 2.0.0)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive

0: jdbc:hive2:// spark-test:1> show databases;
java.lang.IllegalStateException: Can't overwrite cause with 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at java.lang.Throwable.initCause(Throwable.java:457)
at 
org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
at 
org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
at 
org.apache.hive.service.cli.HiveSQLException.toCause(HiveSQLException.java:197)
at 
org.apache.hive.service.cli.HiveSQLException.(HiveSQLException.java:108)
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:256)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:242)
at 
org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:365)
at org.apache.hive.beeline.BufferedRows.(BufferedRows.java:42)
at org.apache.hive.beeline.BeeLine.print(BeeLine.java:1794)
at org.apache.hive.beeline.Commands.execute(Commands.java:860)
at org.apache.hive.beeline.Commands.sql(Commands.java:713)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:973)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:813)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:771)
at 
org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:484)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:467)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 3.0 failed 10 times, most recent failure: Lost task 0.9 in 
stage 3.0 (TID 12, vertica204): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at 

RE: How to make the result of sortByKey distributed evenly?

2016-09-06 Thread AssafMendelson
I imagine this is a sample example to explain a bigger concern.
In general when you do a sort by key, it will implicitly shuffle the data by 
the key. Since you have 1 key (0) with 1 and the other with just 1 record 
it will simply shuffle it into two very skewed partitions.
One way you can solve this is by using salting. For example, you might do 
something like:
val salted = sc.parallelize(2 to n).map(x=>((x/n) * 100 + 
random.nextInt(99),x)).sortByKey().map(x=>(x._1 / 100, x._2)
so basically you would create a temporary key which has randomness in it but 
keeps the original key's true order and then change back to it.

Hope this helps...



From: Zhang, Liyun [mailto:liyun.zh...@intel.com]
Sent: Tuesday, September 06, 2016 11:13 AM
To: user@spark.apache.org
Subject: How to make the result of sortByKey distributed evenly?

Hi all:
  I have a question about RDD.sortByKey

val n=2
val sorted=sc.parallelize(2 to n).map(x=>(x/n,x)).sortByKey()
 sorted.saveAsTextFile("hdfs://bdpe42:8020/SkewedGroupByTest")

sc.parallelize(2 to n).map(x=>(x/n,x)) will generate pairs like 
[(0,2),(0,3),.,(0,1),(1,2)], the key is skewed.

The result of sortByKey is expected to distributed evenly. But when I view the 
result and found that part-0 is large and part-1 is small.

 hadoop fs -ls /SkewedGroupByTest/
16/09/06 03:24:55 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 1 root supergroup 0 2016-09-06 03:21 /SkewedGroupByTest /_SUCCESS
-rw-r--r-- 1 root supergroup 188878 2016-09-06 03:21 
/SkewedGroupByTest/part-0
-rw-r--r-- 1 root supergroup 10 2016-09-06 03:21 /SkewedGroupByTest/part-1

How can I get the result distributed evenly?  I don't need that the key in the 
part-x are same and only need to guarantee the data in part-0 ~ 
part-x is sorted.


Thanks for any help!


Kelly Zhang/Zhang,Liyun
Best Regards





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RE-How-to-make-the-result-of-sortByKey-distributed-evenly-tp27662.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: SPARK ML- Feature Selection Techniques

2016-09-06 Thread DB Tsai
You can try LOR with L1.

Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Mon, Sep 5, 2016 at 5:31 AM, Bahubali Jain  wrote:
> Hi,
> Do we have any feature selection techniques implementation(wrapper
> methods,embedded methods) available in SPARK ML ?
>
> Thanks,
> Baahu
> --
> Twitter:http://twitter.com/Baahu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Any estimate for a Spark 2.0.1 release date?

2016-09-06 Thread Takeshi Yamamuro
Oh, sorry. I forgot attaching an URL;
https://www.mail-archive.com/user@spark.apache.org/msg55723.html

// maropu

On Tue, Sep 6, 2016 at 2:41 PM, Morten Hornbech 
wrote:

> Sorry. Seen what? I think you forgot a link.
>
> Morten
>
> Den 6. sep. 2016 kl. 04.51 skrev Takeshi Yamamuro :
>
> Hi,
>
> Have you seen this?
>
> // maropu
>
> On Tue, Sep 6, 2016 at 7:42 AM, mhornbech  wrote:
>
>> I can't find any JIRA issues with the tag that are unresolved. Apologies
>> if
>> this is a rookie mistake and the information is available elsewhere.
>>
>> Morten
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Any-estimate-for-a-Spark-2-0-1-release
>> -date-tp27659.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>


-- 
---
Takeshi Yamamuro


Getting figures from spark streaming

2016-09-06 Thread Ashok Kumar
Hello Gurus,
I am creating some figures and feed them into Kafka and then spark streaming.
It works OK but I have the following issue.
For now as a test I sent 5 prices in each batch interval. In the loop code this 
is what is hapening
      dstream.foreachRDD { rdd =>     val x= rdd.count
     i += 1     println(s"> rdd loop i is ${i}, number of lines is  ${x} 
<==")     if (x > 0) {       println(s"processing ${x} 
records=")       var words1 = 
rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0)        
println (words1)       var words2 = 
rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0)        
println (words2)       var price = 
rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0)        
println (price)        rdd.collect.foreach(println)       }     }

My tuple looks like this
// (null, "ID       TIMESTAMP                           PRICE")// (null, 
"40,20160426-080924,                  67.55738301621814598514")
And this the sample output from the run
processing 5 
records=320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121)
Now it does process the first values 3, 20160906-212509, 80.224686  for record 
(null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 
records. How can I make it go through all records here? I want the third column 
from all records!
Greetings




Re: Spark transformations

2016-09-06 Thread janardhan shetty
Noticed few things about Spark transformers just wanted to be clear.

Unary transformer:

createTransformFunc: IN => OUT  = { *item* => }
Here *item *is single element and *NOT* entire column.

I would like to get the number of elements in that particular column. Since
there is *no forward checking* how can we get this information ?
We have visibility into single element and not the entire column.










On Sun, Sep 4, 2016 at 9:30 AM, janardhan shetty 
wrote:

> In scala Spark ML Dataframes.
>
> On Sun, Sep 4, 2016 at 9:16 AM, Somasundaram Sekar  tigeranalytics.com> wrote:
>
>> Can you try this
>>
>> https://www.linkedin.com/pulse/hive-functions-udfudaf-udtf-
>> examples-gaurav-singh
>>
>> On 4 Sep 2016 9:38 pm, "janardhan shetty"  wrote:
>>
>>> Hi,
>>>
>>> Is there any chance that we can send entire multiple columns to an udf
>>> and generate a new column for Spark ML.
>>> I see similar approach as VectorAssembler but not able to use few
>>> classes /traitslike HasInputCols, HasOutputCol, DefaultParamsWritable since
>>> they are private.
>>>
>>> Any leads/examples is appreciated in this regard..
>>>
>>> Requirement:
>>> *Input*: Multiple columns of a Dataframe
>>> *Output*:  Single new modified column
>>>
>>
>


Re: Spark ML 2.1.0 new features

2016-09-06 Thread Jacek Laskowski
Hi,

https://issues.apache.org/jira/browse/SPARK-17363?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0%20AND%20component%20%3D%20MLlib

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Sep 6, 2016 at 10:27 PM, janardhan shetty
 wrote:
> Any links ?
>
> On Mon, Sep 5, 2016 at 1:50 PM, janardhan shetty 
> wrote:
>>
>> Is there any documentation or links on the new features which we can
>> expect for Spark ML 2.1.0 release ?
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ML 2.1.0 new features

2016-09-06 Thread janardhan shetty
Thanks Jacek.

On Tue, Sep 6, 2016 at 1:44 PM, Jacek Laskowski  wrote:

> Hi,
>
> https://issues.apache.org/jira/browse/SPARK-17363?jql=
> project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0%
> 20AND%20component%20%3D%20MLlib
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Sep 6, 2016 at 10:27 PM, janardhan shetty
>  wrote:
> > Any links ?
> >
> > On Mon, Sep 5, 2016 at 1:50 PM, janardhan shetty  >
> > wrote:
> >>
> >> Is there any documentation or links on the new features which we can
> >> expect for Spark ML 2.1.0 release ?
> >
> >
>


Q: Multiple spark streaming app, one kafka topic, same consumer group

2016-09-06 Thread Mariano Semelman
Hello everybody,

I am trying to understand how Kafka Direct Stream works. I'm interested in
having a production ready Spark Streaming application that consumes a Kafka
topic. But I need to guarantee there's (almost) no downtime, specially
during deploys (and submit) of new versions. What it seems to be the best
solution is to deploy and submit the new version without shutting down the
previous one, wait for the new application to start consuming events and
then shutdown the previous one.

What I would expect is that the events get distributed among the two
applications in a balanced fashion using the consumer group id
​ splitted by the partition key that I've previously set on my Kafka
Producer.​ However I don't see that Kafka Direct stream support this
functionality.

I've achieved this with the Receiver-based approach (btw i've used "kafka"
for the "offsets.storage" kafka property[2]). However this approach come
with technical difficulties named in the Documentation[1] (ie: exactly-once
semantics).

​Anyway, not even this approach seems very failsafe, Does anyone know a way
to safely deploy new versions of a streaming application of this kind
without downtime?

​Thanks in advance

Mariano​
​


[1] http://spark.apache.org/docs/latest/streaming-kafka-integration.html
[2] http://kafka.apache.org/documentation.html#oldconsumerconfigs


Re: Q: Multiple spark streaming app, one kafka topic, same consumer group

2016-09-06 Thread Cody Koeninger
In general, see the material linked from
https://github.com/koeninger/kafka-exactly-once  if you want a better
understanding of the direct stream.

For spark-streaming-kafka-0-8, the direct stream doesn't really care
about consumer group, since it uses the simple consumer.  For the 0.10
version, it uses the new kafka consumer, so consumer group does
matter.  In either case, splitting events across old and new versions
of the job is not what I would want.

I'd suggest making sure that your outputs are idempotent or
transactional, and that the new app has a different consumer group
(for versions for which it matters). Start up the new app, make sure
it is running (even if it errors due to transactional safeguards),
then shut down the old app.


On Tue, Sep 6, 2016 at 3:51 PM, Mariano Semelman
 wrote:
> Hello everybody,
>
> I am trying to understand how Kafka Direct Stream works. I'm interested in
> having a production ready Spark Streaming application that consumes a Kafka
> topic. But I need to guarantee there's (almost) no downtime, specially
> during deploys (and submit) of new versions. What it seems to be the best
> solution is to deploy and submit the new version without shutting down the
> previous one, wait for the new application to start consuming events and
> then shutdown the previous one.
>
> What I would expect is that the events get distributed among the two
> applications in a balanced fashion using the consumer group id
> splitted by the partition key that I've previously set on my Kafka Producer.
> However I don't see that Kafka Direct stream support this functionality.
>
> I've achieved this with the Receiver-based approach (btw i've used "kafka"
> for the "offsets.storage" kafka property[2]). However this approach come
> with technical difficulties named in the Documentation[1] (ie: exactly-once
> semantics).
>
> Anyway, not even this approach seems very failsafe, Does anyone know a way
> to safely deploy new versions of a streaming application of this kind
> without downtime?
>
> Thanks in advance
>
> Mariano
>
>
>
> [1] http://spark.apache.org/docs/latest/streaming-kafka-integration.html
> [2] http://kafka.apache.org/documentation.html#oldconsumerconfigs
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ML 2.1.0 new features

2016-09-06 Thread janardhan shetty
Any links ?

On Mon, Sep 5, 2016 at 1:50 PM, janardhan shetty 
wrote:

> Is there any documentation or links on the new features which we can
> expect for Spark ML 2.1.0 release ?
>


Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-06 Thread Sean Owen
Are you not fitting an intercept / regressing through the origin? with
that constraint it's no longer true that R^2 is necessarily
nonnegative. It basically means that the errors are even bigger than
what you'd get by predicting the data's mean value as a constant
model.

On Tue, Sep 6, 2016 at 8:49 PM, evanzamir  wrote:
> Am I misinterpreting what r2() in the LinearRegression Model summary means?
> By definition, R^2 should never be a negative number!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-06 Thread Yong Zhang
This is an interesting point.


I tested with originally data with Spark 2.0 release, I can get the same 
statistic output in the originally email like following:


50 1.77695393562
51 0.695149898529
52 0.638142108917
53 0.647341966629
54 0.663456916809
55 0.629166126251
56 0.644149065018
57 0.661190986633
58 2.6616499424
59 2.6137509346
60 2.71165704727
61 2.63473916054


Then I tested with your suggestion:


spark/bin/pyspark --driver-java-options '-XX:-DontCompileHugeMethods'


Run the same test code, and here is the output:


50 1.77180695534
51 0.679394006729
52 0.629493951797
53 0.62108206749
54 0.637018918991
55 0.640591144562
56 0.649922132492
57 0.652480125427
58 0.636356830597
59 0.667215824127
60 0.643863916397
61 0.669810056686
62 0.664624929428
63 0.682888031006
64 0.691393136978
65 0.690823078156
66 0.70525097847
67 0.724694013596
68 0.737638950348
69 0.749594926834



Yong


From: Davies Liu 
Sent: Tuesday, September 6, 2016 2:27 PM
To: Сергей Романов
Cc: Gavin Yue; Mich Talebzadeh; user
Subject: Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field 
to aggregation.

I think the slowness is caused by generated aggregate method has more
than 8K bytecodes, than it's not JIT compiled, became much slower.

Could you try to disable the DontCompileHugeMethods by:

-XX:-DontCompileHugeMethods

On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов
 wrote:
> Hi, Gavin,
>
> Shuffling is exactly the same in both requests and is minimal. Both requests
> produces one shuffle task. Running time is the only difference I can see in
> metrics:
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1)
> 0.713730096817
>  {
> "id" : 368,
> "name" : "duration total (min, med, max)",
> "value" : "524"
>   }, {
> "id" : 375,
> "name" : "internal.metrics.executorRunTime",
> "value" : "527"
>   }, {
> "id" : 391,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "244495"
>   }
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1)
> 2.97951102257
>
>   }, {
> "id" : 469,
> "name" : "duration total (min, med, max)",
> "value" : "2654"
>   }, {
> "id" : 476,
> "name" : "internal.metrics.executorRunTime",
> "value" : "2661"
>   }, {
> "id" : 492,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "371883"
>   }, {
>
> Full metrics in attachment.
>
> Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue
> :
>
>
> Any shuffling?
>
>
> On Sep 3, 2016, at 5:50 AM, Сергей Романов 
> wrote:
>
> Same problem happens with CSV data file, so it's not parquet-related either.
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 from pyspark.sql.types import *
 schema = StructType([StructField('dd_convs', FloatType(), True)])
 for x in range(50, 70): print x,
 timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
 schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1)
> 50 0.372850894928
> 51 0.376906871796
> 52 0.381325960159
> 53 0.385444164276
> 54 0.38685192
> 55 0.388918161392
> 56 0.397624969482
> 57 0.391713142395
> 58 2.62714004517
> 59 2.68421196938
> 60 2.74627685547
> 61 2.81081581116
> 62 3.43532109261
> 63 3.07742786407
> 64 3.03904604912
> 65 3.01616096497
> 66 3.06293702126
> 67 3.09386610985
> 68 3.27610206604
> 69 3.2041969299
>
> Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов
> :
>
> Hi,
>
> I had narrowed down my problem to a very simple case. I'm sending 27kb
> parquet in attachment. (file:///data/dump/test2 in example)
>
> Please, can you take a look at it? Why there is performance drop after 57
> sum columns?
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 for x in range(70): print x,
 timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs']
 * x) ).collect, number=1)
> ...
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> 0 1.05591607094
> 1 0.200426101685
> 2 0.203800916672
> 3 0.176458120346
> 4 0.184863805771
> 5 0.232321023941
> 6 0.216032981873
> 7 

Re: distribute work (files)

2016-09-06 Thread Peter Figliozzi
This is a great question.  Basically you don't have to worry about the
details-- just give a wildcard in your call to textFile.  See the Programming
Guide  section
entitled "External Datasets".  The Spark framework will distribute your
data across the workers.  Note that:

*If using a path on the local filesystem, the file must also be accessible
> at the same path on worker nodes. Either copy the file to all workers or
> use a network-mounted shared file system.*


In your case this would mean the directory of files.

Curiously, I cannot get this to work when I mount a directory with sshfs on
all of my worker nodes.  It says "file not found" even though the file
clearly exists in the specified path on all workers.   Anyone care to try
and comment on this?

Thanks,

Pete

On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
wrote:

> Hi,
>
> maybe this is a stupid question:
>
> I have a list of files. Each file I want to take as an input for a
> ML-algorithm. All files are independent from another.
> My question now is how do I distribute the work so that each worker takes
> a block of files and just runs the algorithm on them one by one.
> I hope somebody can point me in the right direction! :)
>
> Best regards,
> Lydia
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to make the result of sortByKey distributed evenly?

2016-09-06 Thread Fridtjof Sander
Your data has only two keys, and basically all values are assigned to 
only one of them. There is no better way to distribute the keys, than 
the one Spark executes.


What you have to do is to use different keys to sort and range-partition 
on. Try to invoke sortBy() on a non-pair-RDD. This will take both parts 
of your data as key so sort on. You can also set your tuple as key 
manually, and set a constant int or something as value.


Am 06.09.16 um 10:13 schrieb Zhang, Liyun:


Hi all:

  I have a question about RDD.sortByKey

val n=2
val sorted=sc.parallelize(2 to n).map(x=>(x/n,x)).sortByKey()
 sorted.saveAsTextFile("hdfs://bdpe42:8020/SkewedGroupByTest")

sc.parallelize(2 to n).map(x=>(x/n,x)) will generate pairs like 
[(0,2),(0,3),…..,(0,1),(1,2)], the key is skewed.


The result of sortByKey is expected to distributed evenly. But when I 
view the result and found that part-0 is large and part-1 is 
small.


 hadoop fs -ls /SkewedGroupByTest/
16/09/06 03:24:55 WARN util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes 
where applicable

Found 3 items
-rw-r--r-- 1 root supergroup 0 2016-09-06 03:21 /SkewedGroupByTest 
/_SUCCESS
-rw-r--r-- 1 root supergroup 188878 2016-09-06 03:21 
/SkewedGroupByTest/part-0
-rw-r--r-- 1 root supergroup 10 2016-09-06 03:21 
/SkewedGroupByTest/part-1


How can I get the result distributed evenly?  I don’t need that the 
key in the part-x are same and only need to guarantee the data in 
part-0 ~ part-x is sorted.


Thanks for any help!

Kelly Zhang/Zhang,Liyun

Best Regards





How to make the result of sortByKey distributed evenly?

2016-09-06 Thread Zhang, Liyun
Hi all:
  I have a question about RDD.sortByKey

val n=2
val sorted=sc.parallelize(2 to n).map(x=>(x/n,x)).sortByKey()
 sorted.saveAsTextFile("hdfs://bdpe42:8020/SkewedGroupByTest")

sc.parallelize(2 to n).map(x=>(x/n,x)) will generate pairs like 
[(0,2),(0,3),.,(0,1),(1,2)], the key is skewed.

The result of sortByKey is expected to distributed evenly. But when I view the 
result and found that part-0 is large and part-1 is small.

 hadoop fs -ls /SkewedGroupByTest/
16/09/06 03:24:55 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 1 root supergroup 0 2016-09-06 03:21 /SkewedGroupByTest /_SUCCESS
-rw-r--r-- 1 root supergroup 188878 2016-09-06 03:21 
/SkewedGroupByTest/part-0
-rw-r--r-- 1 root supergroup 10 2016-09-06 03:21 /SkewedGroupByTest/part-1

How can I get the result distributed evenly?  I don't need that the key in the 
part-x are same and only need to guarantee the data in part-0 ~ 
part-x is sorted.


Thanks for any help!


Kelly Zhang/Zhang,Liyun
Best Regards



Getting memory error when starting spark shell but not often

2016-09-06 Thread Divya Gehlot
Hi,
I am using EMR 4.7 with Spark 1.6
Sometimes when I start the spark shell I get below error

OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x0005662c, 10632822784, 0) failed; error='Cannot
> allocate memory' (errno=12)
> #
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (malloc) failed to allocate 10632822784 bytes
> for committing reserved memory.
> # An error report file with more information is saved as:
> # /tmp/jvm-6066/hs_error.log



Has any body encountered this kind of issue .
Would really appreciate the resolution.


Thanks,
Divya


[Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Divya Gehlot
Hi,
I am getting below error if I try to use properties file paramater in
spark-submit

Exception in thread "main" java.util.ServiceConfigurationError:
org.apache.hadoop.fs.FileSystem: Provider
org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:224)
at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2684)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2737)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:142)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:653)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:651)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.NoClassDefFoundError:
com/amazonaws/services/s3/AmazonS3
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
at java.lang.Class.getConstructor0(Class.java:2895)
at java.lang.Class.newInstance(Class.java:354)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
... 19 more
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.AmazonS3
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 24 more
End of LogType:stderr

If I remove the --properties-file parameter
the error is gone

Would really appreciate the help .



Thanks,
Divya


Total memory of workers

2016-09-06 Thread tan shai
Hello,

Can anyone explain to me the behavior of spark if the size of the processed
file is greater than the total memory available on workers?

Many thanks.


Re: Scala Vs Python

2016-09-06 Thread Leonard Cohen
hi spark user,


IMHO, I will use the language for application aligning with the language under 
which the system designed.


If working on Spark, I choose Scala.
If working on Hadoop, I choose Java.
If working on nothing, I use Python.
Why?
Because it will save my life, just kidding.




Best regards,
Leonard
-- Original --
From:  "Luciano Resende";;
Send time: Tuesday, Sep 6, 2016 8:07 AM
To: "darren"; 
Cc: "Mich Talebzadeh"; "Jakob 
Odersky"; "ayan guha"; "kant 
kodali"; "AssafMendelson"; 
"user"; 
Subject:  Re: Scala Vs Python





On Thu, Sep 1, 2016 at 3:15 PM, darren  wrote:
This topic is a concern for us as well. In the data science world no one uses 
native scala or java by choice. It's R and Python. And python is growing. Yet 
in spark, python is 3rd in line for feature support, if at all.


This is why we have decoupled from spark in our project. It's really 
unfortunate spark team have invested so heavily in scale. 


As for speed it comes from horizontal scaling and throughout. When you can 
scale outward, individual VM performance is less an issue. Basic HPC principles.





You could still try to get best of the both worlds, having your data scientists 
writing their algorithms using Python and/or R and have a compiler/optimizer 
handling the optimizations to run in a distributed fashion in a spark cluster 
leveraging some of the low level apis written in java/scala. Take a look at 
Apache SystemML http://systemml.apache.org/ for more details.




-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/

Spark Checkpoint for JDBC/ODBC

2016-09-06 Thread Selvam Raman
Hi,

Need your input to take decision.

We have an n-number of databases(ie oracle, MySQL,etc). I want to read a
data from the sources but how it is maintaining fault tolerance in source
side.

if source side system went down. how the spark system reads the data.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


clear steps for installation of spark, cassandra and cassandra connector to run on spyder 2.3.7 using python 3.5 and anaconda 2.4 ipython 4.0

2016-09-06 Thread muhammet pakyürek


could u send me  documents and links to satisfy all above requirements of 
installation of spark, cassandra and cassandra connector to run on spyder 2.3.7 
using python 3.5 and anaconda 2.4 ipython 4.0






Re: Scala Vs Python

2016-09-06 Thread 刘虓
Hi,
I have been using spark-sql with python for more than one year from ver
1.5.0 to ver 2.0.0,
It works great so far,the performance is always great,though I have not
done the benchmark yet.
also I have skimmed through source code of python api,most of it only calls
scala api,nothing heavily is done using python.


2016-09-06 18:38 GMT+08:00 Leonard Cohen <3498363...@qq.com>:

> hi spark user,
>
> IMHO, I will use the language for application aligning with the language
> under which the system designed.
>
> If working on Spark, I choose Scala.
> If working on Hadoop, I choose Java.
> If working on nothing, I use Python.
> Why?
> Because it will save my life, just kidding.
>
>
> Best regards,
> Leonard
> -- Original --
> *From: * "Luciano Resende";;
> *Send time:* Tuesday, Sep 6, 2016 8:07 AM
> *To:* "darren";
> *Cc:* "Mich Talebzadeh"; "Jakob Odersky"<
> ja...@odersky.com>; "ayan guha"; "kant kodali"<
> kanth...@gmail.com>; "AssafMendelson"; "user"<
> user@spark.apache.org>;
> *Subject: * Re: Scala Vs Python
>
>
>
> On Thu, Sep 1, 2016 at 3:15 PM, darren  wrote:
>
>> This topic is a concern for us as well. In the data science world no one
>> uses native scala or java by choice. It's R and Python. And python is
>> growing. Yet in spark, python is 3rd in line for feature support, if at all.
>>
>> This is why we have decoupled from spark in our project. It's really
>> unfortunate spark team have invested so heavily in scale.
>>
>> As for speed it comes from horizontal scaling and throughout. When you
>> can scale outward, individual VM performance is less an issue. Basic HPC
>> principles.
>>
>>
> You could still try to get best of the both worlds, having your data
> scientists writing their algorithms using Python and/or R and have a
> compiler/optimizer handling the optimizations to run in a distributed
> fashion in a spark cluster leveraging some of the low level apis written in
> java/scala. Take a look at Apache SystemML http://systemml.apache.org/
> for more details.
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


RE: Spark 2.0.0 Thrift Server problem with Hive metastore

2016-09-06 Thread Campagnola, Francesco
The same error occurs when executing any “explain” command:

0: jdbc:hive2://spark-test:1> explain select 1 as id;
java.lang.IllegalStateException: Can't overwrite cause with 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
…
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 198.0 failed 10 times, most recent failure: Lost task 0.9 in 
stage 198.0 (TID 2046, vertica204): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)

I have checked the source code, and it seems this explicit cast is causing the 
issue:

>spark-2.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
execute().mapPartitionsInternal { iter =>
  var count = 0
  val buffer = new Array[Byte](4 << 10)  // 4K
  val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
  val bos = new ByteArrayOutputStream()
  val out = new DataOutputStream(codec.compressedOutputStream(bos))
  while (iter.hasNext && (n < 0 || count < n)) {
val row = iter.next().asInstanceOf[UnsafeRow]

Hope this issue will be fixed in the next releases…


From: Campagnola, Francesco
Sent: martedì 6 settembre 2016 09:46
To: 'Jeff Zhang' 
Cc: user@spark.apache.org
Subject: RE: Spark 2.0.0 Thrift Server problem with Hive metastore

I mean I have installed Spark 2.0 in the same environment where Spark 1.6 
thrift server was running,
then stopped the Spark 1.6 thrift server and started the Spark 2.0 one.

If I’m not mistaken, Spark 2.0 should be still compatible with Hive 1.2.1 and 
no upgrade procedures are required.
The spark-defaults.conf file has not been changed.

The following commands issued to the Spark 2.0 thrift server work:
create database test;
use test;
create table tb_1 (id int);
insert into table tb_1 select t.id from (select 1 as id) t;

While all of these commands return the same error:
show databases;
show tables;
show partitions tb_1;
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 62.0 failed 10 times, most recent failure: Lost task 0.9 in 
stage 62.0 (TID 540, vertica204): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow





From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: martedì 6 settembre 2016 02:50
To: Campagnola, Francesco 
>
Cc: user@spark.apache.org
Subject: Re: Spark 2.0.0 Thrift Server problem with Hive metastore

How do you upgrade to spark 2.0 ?

On Mon, Sep 5, 2016 at 11:25 PM, Campagnola, Francesco 
> 
wrote:
Hi,

in an already working Spark - Hive environment with Spark 1.6 and Hive 1.2.1, 
with Hive metastore configured on Postgres DB, I have upgraded Spark to the 
2.0.0.

I have started the thrift server on YARN, then tried to execute from the 
beeline cli or a jdbc client the following command:
SHOW DATABASES;
It always gives this error on Spark server side:

spark@spark-test[spark] /home/spark> beeline -u jdbc:hive2://$(hostname):1 
-n spark

Connecting to jdbc:hive2://spark-test:1
16/09/05 17:41:43 INFO jdbc.Utils: Supplied authorities: spark-test:1
16/09/05 17:41:43 INFO jdbc.Utils: Resolved authority: spark-test:1
16/09/05 17:41:43 INFO jdbc.HiveConnection: Will try to open client transport 
with JDBC Uri: jdbc:hive2:// spark-test:1
Connected to: Spark SQL (version 2.0.0)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive

0: jdbc:hive2:// spark-test:1> show databases;
java.lang.IllegalStateException: Can't overwrite cause with 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
org.apache.spark.sql.catalyst.expressions.UnsafeRow
at java.lang.Throwable.initCause(Throwable.java:457)
at 
org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
at 
org.apache.hive.service.cli.HiveSQLException.toStackTrace(HiveSQLException.java:236)
at 
org.apache.hive.service.cli.HiveSQLException.toCause(HiveSQLException.java:197)
at 
org.apache.hive.service.cli.HiveSQLException.(HiveSQLException.java:108)
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:256)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:242)
at 

LabeledPoint creation

2016-09-06 Thread Madabhattula Rajesh Kumar
Hi,

I am new to Spark ML, trying to create a LabeledPoint from categorical
dataset(example code from spark). For this, I am using One-hot encoding
 feature. Below is my code

val df = sparkSession.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c"),
  (6, "d"))).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)

val indexed = indexer.transform(df)

indexed.select("category", "categoryIndex").show()

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)

 encoded.select("id", "category", "categoryVec").show()

*Output :- *
+---++-+
| id|category|  categoryVec|
+---++-+
|  0|   a|(3,[0],[1.0])|
|  1|   b|(3,[],[])|
|  2|   c|(3,[1],[1.0])|
|  3|   a|(3,[0],[1.0])|
|  4|   a|(3,[0],[1.0])|
|  5|   c|(3,[1],[1.0])|
|  6|   d|(3,[2],[1.0])|
+---++-+

*Creating LablePoint from encoded dataframe:-*

val data = encoded.rdd.map { x =>
  {
val featureVector =
Vectors.dense(x.getAs[org.apache.spark.ml.linalg.SparseVector]("categoryVec").toArray)
val label = x.getAs[java.lang.Integer]("id").toDouble
LabeledPoint(label, featureVector)
  }
}

data.foreach { x => println(x) }

*Output :-*

(0.0,[1.0,0.0,0.0])
(1.0,[0.0,0.0,0.0])
(2.0,[0.0,1.0,0.0])
(3.0,[1.0,0.0,0.0])
(4.0,[1.0,0.0,0.0])
(5.0,[0.0,1.0,0.0])
(6.0,[0.0,0.0,1.0])

I have a four categorical values like a, b, c, d. I am expecting 4 features
in the above LablePoint but it has only 3 features.

Please help me to creation of LablePoint from categorical features.

Regards,
Rajesh


Re: [Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Divya Gehlot
Yes I am reading from s3 bucket ..
Strangely the  error goes off when I remove the properties girl parameter .

On Sep 6, 2016 8:35 PM, "Sonal Goyal"  wrote:

> Looks like a classpath issue - Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.services.s3.AmazonS3
>
> Are you using S3 somewhere? Are the required jars in place?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies 
> Reifier at Strata Hadoop World
> 
> Reifier at Spark Summit 2015
> 
>
> 
>
>
>
> On Tue, Sep 6, 2016 at 4:45 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>> I am getting below error if I try to use properties file paramater in
>> spark-submit
>>
>> Exception in thread "main" java.util.ServiceConfigurationError:
>> org.apache.hadoop.fs.FileSystem: Provider 
>> org.apache.hadoop.fs.s3a.S3AFileSystem
>> could not be instantiated
>> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
>> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
>> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSyste
>> m.java:2684)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
>> .java:2737)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
>> at org.apache.spark.deploy.yarn.ApplicationMaster.run(Applicati
>> onMaster.scala:142)
>> at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main
>> $1.apply$mcV$sp(ApplicationMaster.scala:653)
>> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHad
>> oopUtil.scala:69)
>> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHad
>> oopUtil.scala:68)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
>> upInformation.java:1657)
>> at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(Spark
>> HadoopUtil.scala:68)
>> at org.apache.spark.deploy.yarn.ApplicationMaster$.main(Applica
>> tionMaster.scala:651)
>> at org.apache.spark.deploy.yarn.ApplicationMaster.main(Applicat
>> ionMaster.scala)
>> Caused by: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/Amaz
>> onS3
>> at java.lang.Class.getDeclaredConstructors0(Native Method)
>> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
>> at java.lang.Class.getConstructor0(Class.java:2895)
>> at java.lang.Class.newInstance(Class.java:354)
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
>> ... 19 more
>> Caused by: java.lang.ClassNotFoundException:
>> com.amazonaws.services.s3.AmazonS3
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> ... 24 more
>> End of LogType:stderr
>>
>> If I remove the --properties-file parameter
>> the error is gone
>>
>> Would really appreciate the help .
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>


[Spark-Submit:]Error while reading from s3n

2016-09-06 Thread Divya Gehlot
Hi,
I am on EMR 4.7 with Spark 1.6.1
I am trying to read from s3n buckets in spark
Option 1 :
If I set up

hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")
hadoopConf.set("fs.s3.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))


and access the bucket as s3://bucket-name

I am getting below error

Exception in thread "main" java.io.IOException: /batch-147313410
doesn't exist
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy37.retrieveINode(Unknown Source)
at 
org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1730)
at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:231)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:281)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)

Option 2:

If I set up

hadoopConf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))


and try to access the bucket as s3n://bucket-name

getting the below error :


Caused by: org.apache.hadoop.security.AccessControlException:
Permission denied: s3n://bucket-name/batch-147313710_$folder$
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)




When I try to list the bucket using aws cli

aws s3 ls s3://bucket-name/


It gives me the bucket listing.


Would really appreciate the help.



Thanks,

Divya


Re: [Spark submit] getting error when use properties file parameter in spark submit

2016-09-06 Thread Sonal Goyal
Looks like a classpath issue - Caused by: java.lang.ClassNotFoundException:
com.amazonaws.services.s3.AmazonS3

Are you using S3 somewhere? Are the required jars in place?

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World 
Reifier at Spark Summit 2015






On Tue, Sep 6, 2016 at 4:45 PM, Divya Gehlot 
wrote:

> Hi,
> I am getting below error if I try to use properties file paramater in
> spark-submit
>
> Exception in thread "main" java.util.ServiceConfigurationError:
> org.apache.hadoop.fs.FileSystem: Provider 
> org.apache.hadoop.fs.s3a.S3AFileSystem
> could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2673)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(
> FileSystem.java:2684)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2701)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2737)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2719)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:375)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
> at org.apache.spark.deploy.yarn.ApplicationMaster.run(
> ApplicationMaster.scala:142)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$
> main$1.apply$mcV$sp(ApplicationMaster.scala:653)
> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(
> SparkHadoopUtil.scala:69)
> at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(
> SparkHadoopUtil.scala:68)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1657)
> at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(
> SparkHadoopUtil.scala:68)
> at org.apache.spark.deploy.yarn.ApplicationMaster$.main(
> ApplicationMaster.scala:651)
> at org.apache.spark.deploy.yarn.ApplicationMaster.main(
> ApplicationMaster.scala)
> Caused by: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/
> AmazonS3
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2595)
> at java.lang.Class.getConstructor0(Class.java:2895)
> at java.lang.Class.newInstance(Class.java:354)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 19 more
> Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.
> AmazonS3
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 24 more
> End of LogType:stderr
>
> If I remove the --properties-file parameter
> the error is gone
>
> Would really appreciate the help .
>
>
>
> Thanks,
> Divya
>


Re: clear steps for installation of spark, cassandra and cassandra connector to run on spyder 2.3.7 using python 3.5 and anaconda 2.4 ipython 4.0

2016-09-06 Thread ayan guha
Spark has pretty extensive documentation, that should be your starting
point. I do not use Cassandra much, but Cassandra connector should be a
spark package, so look for spark package website.

If I may say so, all docs should be one or two Google search away :)
On 6 Sep 2016 20:34, "muhammet pakyürek"  wrote:

>
>
> could u send me  documents and links to satisfy all above requirements of 
> installation
> of spark, cassandra and cassandra connector to run on spyder 2.3.7 using
> python 3.5 and anaconda 2.4 ipython 4.0
>
>
> --
>
>


distribute work (files)

2016-09-06 Thread Lydia Ickler
Hi, 

maybe this is a stupid question:

I have a list of files. Each file I want to take as an input for a 
ML-algorithm. All files are independent from another.
My question now is how do I distribute the work so that each worker takes a 
block of files and just runs the algorithm on them one by one.
I hope somebody can point me in the right direction! :)

Best regards, 
Lydia
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe, Java: How to convert String to Vector ?

2016-09-06 Thread Peter Figliozzi
Hi Yan, I think you'll have to map the features column to a new numerical
features column.

Here's one way to do the individual transform:

scala> val x = "[1, 2, 3, 4, 5]"
x: String = [1, 2, 3, 4, 5]

scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "") split("
") map(_.toInt)
y: Array[Int] = Array(1, 2, 3, 4, 5)

If you don't know about the Scala command line, just type "scala" in a
terminal window.  It's a good place to try things out.

You can make a function out of this transformation and apply it to your
features column to make a new column.  Then add this with
Dataset.withColumn.

See here

on how to apply a function to a Column to make a new column.

On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:

> Hi,
> I have a csv file like:
> uid  mid  features   label
> 1235231[0, 1, 3, ...]True
>
> Both  "features" and "label" columns are used for GBTClassifier.
>
> However, when I read the file:
> Dataset samples = sparkSession.read().csv(file);
> The type of samples.select("features") is String.
>
> My question is:
> How to map samples.select("features") to Vector or any appropriate type,
> so I can use it to train like:
> GBTClassifier gbdt = new GBTClassifier()
> .setLabelCol("label")
> .setFeaturesCol("features")
> .setMaxIter(2)
> .setMaxDepth(7);
>
> Thanks.
>


Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Cody Koeninger
If you leave enable.auto.commit set to true, it will commit offsets to
kafka, but you will get undefined delivery semantics.

If you just want to restart from a fresh state, the easiest thing to
do is use a new consumer group name.

But if that keeps happening, you should look into why your retention
is not sufficient.

On Tue, Sep 6, 2016 at 9:39 AM, Srikanth  wrote:
> You are right. I got confused as its all part of same log when running from
> IDE.
> I was looking for a good guide to read to understand the this integ.
>
> I'm not managing offset on my own. I've not enabled checkpoint for my tests.
> I assumed offsets will be stored in kafka by default.
>
> KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> ssc, PreferConsistent, SubscribePattern[Array[Byte],
> Array[Byte]](pattern, kafkaParams) )
>
>* @param offsets: offsets to begin at on initial startup.  If no offset
> is given for a
>* TopicPartition, the committed offset (if applicable) or kafka param
>* auto.offset.reset will be used.
>
> 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> enable.auto.commit = true
> auto.offset.reset = latest
>
> Srikanth
>
> On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger  wrote:
>>
>> Seems like you're confused about the purpose of that line of code, it
>> applies to executors, not the driver. The driver is responsible for
>> determining offsets.
>>
>> Where are you storing offsets, in Kafka, checkpoints, or your own store?
>> Auto offset reset won't be used if there are stored offsets.
>>
>>
>> On Sep 2, 2016 14:58, "Srikanth"  wrote:
>>>
>>> Hi,
>>>
>>> Upon restarting my Spark Streaming app it is failing with error
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent
>>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
>>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of
>>> range with no configured reset policy for partitions: {mt-event-2=1710706}
>>>
>>> It is correct that the last read offset was deleted by kafka due to
>>> retention period expiry.
>>> I've set auto.offset.reset in my app but it is getting reset here
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
>>>
>>> How to force it to restart in this case (fully aware of potential data
>>> loss)?
>>>
>>> Srikanth
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-06 Thread sarlindo
I have 2 questions/issues.

1. We had the spark-master shut down (reason unknown) we looked at the
spark-master logs and it simply shows this, is there some other log I should
be looking at to find out why the master went down?

16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181. Will not attempt to
authenticate using SASL (unknown error)
16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
shutting down.
16/09/05 21:10:00 INFO ClientCnxn: Socket connection established, initiating
session, client: /142.201.219.75:56361, server:
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181


2. Spark 1.6.0 web console shows a running application in a "waiting"
status, but it's actually running. Is this an existing bug?

 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-web-console-shows-running-application-in-a-waiting-status-but-it-s-acutally-running-tp27665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



anyone know what the status of spark-ec2 is?

2016-09-06 Thread Andy Davidson
Spark-ec2 used to be part of the spark distribution. It now seems to be
split into a separate repo https://github.com/amplab/spark-ec2

It does not seem to be listed on https://spark-packages.org/

Does anyone know what the status is? There is a readme.md how ever I am
unable to find any release notes. Is there a spark-ec2 mail list?

Kind regards

Andy




Spray Client VS PlayWS vs Spring RestTemplate within Spark Job

2016-09-06 Thread prosp4300
Hi, Spark Users

As I know, Spray Client depends on Akka ActorSystem, is this dependency 
theoretically means it is not possible to use spray-client in Spark Job which 
run from Spark Executor nodes

I believe PlayWS should works as a Restful client to run from Spark Executor, 
how about traditional Spring RestTemplate, is there any suggestion or best 
practice to follow to acess Restful Service from Spark Jobs?

Thanks a lot



Re: YARN memory overhead settings

2016-09-06 Thread Marcelo Vanzin
It kinda depends on the application. Certain compression libraries, in
particular, are kinda lax with their use of off-heap buffers, so if
you configure executors to use many cores you might end up with higher
usage than the default configuration. Then there are also things like
PARQUET-118.

In any case, growth should not be unbounded, so you can just increase
the value until your jobs start working (or, if growth doesn't stop,
there might be a memory leak somewhere).

On Tue, Sep 6, 2016 at 9:23 AM, Tim Moran  wrote:
> Hi,
>
> I'm running a spark job on YARN, using 6 executors each with 25 GB of memory
> and spark.yarn.executor.overhead set to 5GB. Despite this, I still seem to
> see YARN killing my executors for exceeding the memory limit.
>
> Reading the docs, it looks like the overhead defaults to around 10% of the
> size of the heap - yet I'm still seeing failures when it's set to 20% of the
> heap size. Is this expected? Are there any particular issues or antipatterns
> in Spark code that could cause the JVM to use an excessive amount of memory
> beyond the heap?
>
> Thanks,
>
> Tim.
>
> This email is confidential, if you are not the intended recipient please
> delete it and notify us immediately by emailing the sender. You should not
> copy it or use it for any purpose nor disclose its contents to any other
> person. Privitar Limited is registered in England with registered number
> 09305666. Registered office Salisbury House, Station Road, Cambridge,
> CB12LA.



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



YARN memory overhead settings

2016-09-06 Thread Tim Moran
Hi,

I'm running a spark job on YARN, using 6 executors each with 25 GB of
memory and spark.yarn.executor.overhead set to 5GB. Despite this, I still
seem to see YARN killing my executors for exceeding the memory limit.

Reading the docs, it looks like the overhead defaults to around 10% of the
size of the heap - yet I'm still seeing failures when it's set to 20% of
the heap size. Is this expected? Are there any particular issues or
antipatterns in Spark code that could cause the JVM to use an excessive
amount of memory beyond the heap?

Thanks,

Tim.

-- 
This email is confidential, if you are not the intended recipient please 
delete it and notify us immediately by emailing the sender. You should not 
copy it or use it for any purpose nor disclose its contents to any other 
person. Privitar Limited is registered in England with registered number 
09305666. Registered office Salisbury House, Station Road, Cambridge, 
CB12LA.


Spark 1.6.0 web console shows a running application in a "waiting" status, but it's actually running. Is this an existing bug?

2016-09-06 Thread sarlindo
I have 2 questions/issues.

1. We had the spark-master shut down (reason unknown) we looked at the
spark-master logs and it simply shows this, is there some other log I should
be looking at to find out why the master went down?

16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server xxx
Will not attempt to authenticate using SASL (unknown error)
16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
shutting down.
16/09/05 21:10:00 INFO ClientCnxn: Socket connection established, initiating
session, client: xxx, server: xxx/xxx


2. Spark 1.6.0 web console shows a running application in a "waiting"
status, but it's actually running. Is this an existing bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-0-web-console-shows-a-running-application-in-a-waiting-status-but-it-s-actually-running-Is-tp27666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Complex RDD operation as DataFrame UDF ?

2016-09-06 Thread Thunder Stumpges
Hi guys, Spark 1.6.1 here.

I am trying to "DataFrame-ize" a complex function I have that currently
operates on a DataSet, and returns another DataSet with a new "column"
added to it. I'm trying to fit this into the new ML "Model" format where I
can receive a DataFrame, ensure the input column exists, then perform my
transform and append as a new column.

>From reviewing other ML Model code, the way I see this happen is typically
using a UDF on the input to create the output. My problem is this requires
the UDF to operate on each record one by one.

In my case I am doing a chain of RDD/DataSet operations (flatMap, join with
another cached RDD, run a calculation, reduce) on the original input column.

How can I do this with DataFrames?

thanks,
Thunder


Re: Using spark package XGBoost

2016-09-06 Thread janardhan shetty
Is this merged to Spark ML ? If so which version ?

On Tue, Sep 6, 2016 at 12:58 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Sorry to bother you, but I'ld like to inform you our activities.
> We'll start incubating our product, Hivemall, in Apache and this is a
> scalable ML library
> for Hive/Spark/Pig.
>
> - http://wiki.apache.org/incubator/HivemallProposal
> - http://markmail.org/thread/mjwyyd4btthk3626
>
> I made a pr for XGBoost integration on DataFrame/Spark(https://
> github.com/myui/hivemall/pull/281)
> and this pr has already been merged in a master.
> I wrote how to use the integration on my gist:
> https://gist.github.com/maropu/33794b293ee937e99b8fb0788843fa3f
>
> If you are interested in the integration, could you please you try using
> it and
> let me know the issues that you get stuck in?
>
> Best regards,
> takeshi
>
> // maropu
>
>
>
> On Mon, Aug 15, 2016 at 1:04 PM, Brandon White 
> wrote:
>
>> The XGBoost integration with Spark is currently only supported for RDDs,
>> there is a ticket for dataframe and folks calm to be working on it.
>>
>> On Aug 14, 2016 8:15 PM, "Jacek Laskowski"  wrote:
>>
>>> Hi,
>>>
>>> I've never worked with the library and speaking about sbt setup only.
>>>
>>> It appears that the project didn't release 2.11-compatible jars (only
>>> 2.10) [1] so you need to build the project yourself and uber-jar it
>>> (using sbt-assembly plugin).
>>>
>>> [1] https://spark-packages.org/package/rotationsymmetry/sparkxgboost
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sun, Aug 14, 2016 at 7:13 AM, janardhan shetty
>>>  wrote:
>>> > Any leads how to do acheive this?
>>> >
>>> > On Aug 12, 2016 6:33 PM, "janardhan shetty" 
>>> wrote:
>>> >>
>>> >> I tried using  sparkxgboost package in build.sbt file but it failed.
>>> >> Spark 2.0
>>> >> Scala 2.11.8
>>> >>
>>> >> Error:
>>> >>  [warn]
>>> >> http://dl.bintray.com/spark-packages/maven/rotationsymmetry/
>>> sparkxgboost/0.2.1-s_2.10/sparkxgboost-0.2.1-s_2.10-javadoc.jar
>>> >>[warn] ::
>>> >>[warn] ::  FAILED DOWNLOADS::
>>> >>[warn] :: ^ see resolution messages for details  ^ ::
>>> >>[warn] ::
>>> >>[warn] ::
>>> >> rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(src)
>>> >>[warn] ::
>>> >> rotationsymmetry#sparkxgboost;0.2.1-s_2.10!sparkxgboost.jar(doc)
>>> >>
>>> >> build.sbt:
>>> >>
>>> >> scalaVersion := "2.11.8"
>>> >>
>>> >> libraryDependencies ++= {
>>> >>   val sparkVersion = "2.0.0-preview"
>>> >>   Seq(
>>> >> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>>> >> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>>> >> "org.apache.spark" %% "spark-streaming" % sparkVersion %
>>> "provided",
>>> >> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
>>> >>   )
>>> >> }
>>> >>
>>> >> resolvers += "Spark Packages Repo" at
>>> >> "http://dl.bintray.com/spark-packages/maven;
>>> >>
>>> >> libraryDependencies += "rotationsymmetry" % "sparkxgboost" %
>>> >> "0.2.1-s_2.10"
>>> >>
>>> >> assemblyMergeStrategy in assembly := {
>>> >>   case PathList("META-INF", "MANIFEST.MF")   =>
>>> >> MergeStrategy.discard
>>> >>   case PathList("javax", "servlet", xs @ _*) =>
>>> >> MergeStrategy.first
>>> >>   case PathList(ps @ _*) if ps.last endsWith ".html" =>
>>> >> MergeStrategy.first
>>> >>   case "application.conf"=>
>>> >> MergeStrategy.concat
>>> >>   case "unwanted.txt"=>
>>> >> MergeStrategy.discard
>>> >>
>>> >>   case x => val oldStrategy = (assemblyMergeStrategy in
>>> assembly).value
>>> >> oldStrategy(x)
>>> >>
>>> >> }
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Fri, Aug 12, 2016 at 3:35 PM, janardhan shetty <
>>> janardhan...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Is there a dataframe version of XGBoost in spark-ml ?.
>>> >>> Has anyone used sparkxgboost package ?
>>> >>
>>> >>
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-06 Thread Davies Liu
I think the slowness is caused by generated aggregate method has more
than 8K bytecodes, than it's not JIT compiled, became much slower.

Could you try to disable the DontCompileHugeMethods by:

-XX:-DontCompileHugeMethods

On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов
 wrote:
> Hi, Gavin,
>
> Shuffling is exactly the same in both requests and is minimal. Both requests
> produces one shuffle task. Running time is the only difference I can see in
> metrics:
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1)
> 0.713730096817
>  {
> "id" : 368,
> "name" : "duration total (min, med, max)",
> "value" : "524"
>   }, {
> "id" : 375,
> "name" : "internal.metrics.executorRunTime",
> "value" : "527"
>   }, {
> "id" : 391,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "244495"
>   }
>
> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
> schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1)
> 2.97951102257
>
>   }, {
> "id" : 469,
> "name" : "duration total (min, med, max)",
> "value" : "2654"
>   }, {
> "id" : 476,
> "name" : "internal.metrics.executorRunTime",
> "value" : "2661"
>   }, {
> "id" : 492,
> "name" : "internal.metrics.shuffle.write.writeTime",
> "value" : "371883"
>   }, {
>
> Full metrics in attachment.
>
> Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue
> :
>
>
> Any shuffling?
>
>
> On Sep 3, 2016, at 5:50 AM, Сергей Романов 
> wrote:
>
> Same problem happens with CSV data file, so it's not parquet-related either.
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 from pyspark.sql.types import *
 schema = StructType([StructField('dd_convs', FloatType(), True)])
 for x in range(50, 70): print x,
 timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
 schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1)
> 50 0.372850894928
> 51 0.376906871796
> 52 0.381325960159
> 53 0.385444164276
> 54 0.38685192
> 55 0.388918161392
> 56 0.397624969482
> 57 0.391713142395
> 58 2.62714004517
> 59 2.68421196938
> 60 2.74627685547
> 61 2.81081581116
> 62 3.43532109261
> 63 3.07742786407
> 64 3.03904604912
> 65 3.01616096497
> 66 3.06293702126
> 67 3.09386610985
> 68 3.27610206604
> 69 3.2041969299
>
> Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов
> :
>
> Hi,
>
> I had narrowed down my problem to a very simple case. I'm sending 27kb
> parquet in attachment. (file:///data/dump/test2 in example)
>
> Please, can you take a look at it? Why there is performance drop after 57
> sum columns?
>
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 2.0.0
>   /_/
>
> Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
> SparkSession available as 'spark'.
 import timeit
 for x in range(70): print x,
 timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs']
 * x) ).collect, number=1)
> ...
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> 0 1.05591607094
> 1 0.200426101685
> 2 0.203800916672
> 3 0.176458120346
> 4 0.184863805771
> 5 0.232321023941
> 6 0.216032981873
> 7 0.201778173447
> 8 0.292424917221
> 9 0.228524923325
> 10 0.190534114838
> 11 0.197028160095
> 12 0.270443916321
> 13 0.429781913757
> 14 0.270851135254
> 15 0.776989936829
> 16 0.27879181
> 17 0.227638959885
> 18 0.212944030762
> 19 0.2144780159
> 20 0.22200012207
> 21 0.262261152267
> 22 0.254227876663
> 23 0.275084018707
> 24 0.292124032974
> 25 0.280488014221
> 16/09/03 15:31:28 WARN Utils: Truncated the string representation of a plan
> since it was too large. This behavior can be adjusted by setting
> 'spark.debug.maxToStringFields' in SparkEnv.conf.
> 26 0.290093898773
> 27 0.238478899002
> 28 0.246420860291
> 29 0.241401195526
> 30 0.255286931992
> 31 0.42702794075
> 32 0.327946186066
> 33 0.434395074844
> 34 0.314198970795
> 35 0.34576010704
> 36 0.278323888779
> 37 0.289474964142
> 38 0.290827989578
> 39 0.376291036606
> 40 0.347742080688
> 41 0.363158941269
> 42 0.318687915802
> 43 0.376327991486
> 44 0.374994039536
> 45 0.362971067429
> 46 0.425967931747
> 47 0.370860099792
> 48 0.443903923035
> 49 0.374128103256
> 50 0.378985881805
> 51 0.476850986481
> 52 0.451028823853
> 53 0.432540893555
> 54 0.514838933945
> 55 0.53990483284
> 56 

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Srikanth
This isn't a production setup. We kept retention low intentionally.
My original question was why I got the exception instead of it using
auto.offset.reset
on restart?




On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger  wrote:

> If you leave enable.auto.commit set to true, it will commit offsets to
> kafka, but you will get undefined delivery semantics.
>
> If you just want to restart from a fresh state, the easiest thing to
> do is use a new consumer group name.
>
> But if that keeps happening, you should look into why your retention
> is not sufficient.
>
> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth  wrote:
> > You are right. I got confused as its all part of same log when running
> from
> > IDE.
> > I was looking for a good guide to read to understand the this integ.
> >
> > I'm not managing offset on my own. I've not enabled checkpoint for my
> tests.
> > I assumed offsets will be stored in kafka by default.
> >
> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> > Array[Byte]](pattern, kafkaParams) )
> >
> >* @param offsets: offsets to begin at on initial startup.  If no
> offset
> > is given for a
> >* TopicPartition, the committed offset (if applicable) or kafka param
> >* auto.offset.reset will be used.
> >
> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> > enable.auto.commit = true
> > auto.offset.reset = latest
> >
> > Srikanth
> >
> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger 
> wrote:
> >>
> >> Seems like you're confused about the purpose of that line of code, it
> >> applies to executors, not the driver. The driver is responsible for
> >> determining offsets.
> >>
> >> Where are you storing offsets, in Kafka, checkpoints, or your own store?
> >> Auto offset reset won't be used if there are stored offsets.
> >>
> >>
> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
> >>>
> >>> Hi,
> >>>
> >>> Upon restarting my Spark Streaming app it is failing with error
> >>>
> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> aborted
> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent
> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
> out of
> >>> range with no configured reset policy for partitions:
> {mt-event-2=1710706}
> >>>
> >>> It is correct that the last read offset was deleted by kafka due to
> >>> retention period expiry.
> >>> I've set auto.offset.reset in my app but it is getting reset here
> >>>
> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >>>
> >>> How to force it to restart in this case (fully aware of potential data
> >>> loss)?
> >>>
> >>> Srikanth
> >
> >
>


I noticed LinearRegression sometimes produces negative R^2 values

2016-09-06 Thread evanzamir
Am I misinterpreting what r2() in the LinearRegression Model summary means?
By definition, R^2 should never be a negative number!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Datasets and Partitioners

2016-09-06 Thread Darin McBeath
How do you find the partitioner for a Dataset?

I have a Dataset (om) which I created and repartitioned using one of the fields 
(docId).  Reading the documentation, I would assume the om Dataset should be 
hash partitioned.  But, how can I verify this?

When I do om.rdd.partitioner I get 

Option[org.apache.spark.Partitioner] = None

But, perhaps this is not equivalent.

The reason I ask is that when I use this cached Dataset in a join with another 
Dataset (partitioned on the same column and cached) I see things like the 
following in my explain which makes me think the Dataset might have lost the 
partitioner.  I also see a couple of stages for the job where it seems like 
each Dataset in my join is being read in and shuffled out again (I'm assuming 
for the hash partitioning required by the join)

Exchange hashpartitioning(_1#6062.docId, 8)

Any thoughts/ideas would be appreciated.

Thanks.

Darin.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-06 Thread Nick Pentreath
That does seem strange. Can you provide an example to reproduce?



On Tue, 6 Sep 2016 at 21:49 evanzamir  wrote:

> Am I misinterpreting what r2() in the LinearRegression Model summary means?
> By definition, R^2 should never be a negative number!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is it possible to submit Spark Application remotely?

2016-09-06 Thread neil90
You need to pass --cluster-mode to spark-submit, this will push the driver to
cluster rather then it run locally on your computer.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-submit-Spark-Application-remotely-tp27640p27668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-06 Thread Cody Koeninger
You don't want auto.offset.reset on executors, you want executors to
do what the driver told them to do.  Otherwise you're going to get
really horrible data inconsistency issues if the executors silently
reset.

If your retention is so low that retention gets expired in between
when the driver created a batch with a given starting offset, and when
an executor starts to process that batch, you're going to have
problems.

On Tue, Sep 6, 2016 at 2:30 PM, Srikanth  wrote:
> This isn't a production setup. We kept retention low intentionally.
> My original question was why I got the exception instead of it using
> auto.offset.reset on restart?
>
>
>
>
> On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger  wrote:
>>
>> If you leave enable.auto.commit set to true, it will commit offsets to
>> kafka, but you will get undefined delivery semantics.
>>
>> If you just want to restart from a fresh state, the easiest thing to
>> do is use a new consumer group name.
>>
>> But if that keeps happening, you should look into why your retention
>> is not sufficient.
>>
>> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth  wrote:
>> > You are right. I got confused as its all part of same log when running
>> > from
>> > IDE.
>> > I was looking for a good guide to read to understand the this integ.
>> >
>> > I'm not managing offset on my own. I've not enabled checkpoint for my
>> > tests.
>> > I assumed offsets will be stored in kafka by default.
>> >
>> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
>> > Array[Byte]](pattern, kafkaParams) )
>> >
>> >* @param offsets: offsets to begin at on initial startup.  If no
>> > offset
>> > is given for a
>> >* TopicPartition, the committed offset (if applicable) or kafka param
>> >* auto.offset.reset will be used.
>> >
>> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
>> > enable.auto.commit = true
>> > auto.offset.reset = latest
>> >
>> > Srikanth
>> >
>> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger 
>> > wrote:
>> >>
>> >> Seems like you're confused about the purpose of that line of code, it
>> >> applies to executors, not the driver. The driver is responsible for
>> >> determining offsets.
>> >>
>> >> Where are you storing offsets, in Kafka, checkpoints, or your own
>> >> store?
>> >> Auto offset reset won't be used if there are stored offsets.
>> >>
>> >>
>> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> Upon restarting my Spark Streaming app it is failing with error
>> >>>
>> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>> >>> aborted
>> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent
>> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
>> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
>> >>> out of
>> >>> range with no configured reset policy for partitions:
>> >>> {mt-event-2=1710706}
>> >>>
>> >>> It is correct that the last read offset was deleted by kafka due to
>> >>> retention period expiry.
>> >>> I've set auto.offset.reset in my app but it is getting reset here
>> >>>
>> >>>
>> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
>> >>>
>> >>> How to force it to restart in this case (fully aware of potential data
>> >>> loss)?
>> >>>
>> >>> Srikanth
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-06 Thread Evan Zamir
I am using the default setting for setting *fitIntercept*, which *should*
be TRUE right?

On Tue, Sep 6, 2016 at 1:38 PM Sean Owen  wrote:

> Are you not fitting an intercept / regressing through the origin? with
> that constraint it's no longer true that R^2 is necessarily
> nonnegative. It basically means that the errors are even bigger than
> what you'd get by predicting the data's mean value as a constant
> model.
>
> On Tue, Sep 6, 2016 at 8:49 PM, evanzamir  wrote:
> > Am I misinterpreting what r2() in the LinearRegression Model summary
> means?
> > By definition, R^2 should never be a negative number!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Difference between UDF and Transformer in Spark ML

2016-09-06 Thread janardhan shetty
Apart from creation of a new column what are the other differences between
transformer and an udf in spark ML ?


How to convert String to Vector ?

2016-09-06 Thread Yan Facai
Hi,
I have a csv file like:
uid  mid  features   label
1235231[0, 1, 3, ...]True

Both  "features" and "label" columns are used for GBTClassifier.

However, when I read the file:
Dataset samples = sparkSession.read().csv(file);
The type of samples.select("features") is String.

My question is:
How to map samples.select("features") to Vector or any appropriate type,
so I can use it to train like:
GBTClassifier gbdt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(2)
.setMaxDepth(7);


Thanks.


Re: How to convert String to Vector ?

2016-09-06 Thread Leonard Cohen
hi,


map(feature => List(feature).split(',') )
in python:
list(string.split(',')) : 
eval(string)




http://stackoverflow.com/questions/31376574/spark-rddstring-string-into-rddmapstring-string




-- Original --
From:  "??(Yan Facai)";;
Send time: Tuesday, Sep 6, 2016 5:56 PM
To: "user.spark"; 

Subject:  How to convert String to Vector ?



Hi, 

I have a csv file like:

uid  mid  features   label

1235231[0, 1, 3, ...]True

Both  "features" and "label" columns are used for GBTClassifier.



However, when I read the file:

Dataset samples = sparkSession.read().csv(file);
The type of samples.select("features") is String.


My question is:

How to map samples.select("features") to Vector or any appropriate type,

so I can use it to train like:
GBTClassifier gbdt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(2)
.setMaxDepth(7);



Thanks.

Re: Is it possible to submit Spark Application remotely?

2016-09-06 Thread tosaigan...@gmail.com
you can livy to submit the spark jobs to remotely

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/

Regards,
Sai Ganesh

On Tue, Sep 6, 2016 at 1:24 PM, neil90 [via Apache Spark User List] <
ml-node+s1001560n27668...@n3.nabble.com> wrote:

> You need to pass --cluster-mode to spark-submit, this will push the driver
> to cluster rather then it run locally on your computer.
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-
> possible-to-submit-Spark-Application-remotely-tp27640p27668.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




-
Sai Ganesh
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-submit-Spark-Application-remotely-tp27640p27670.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Getting memory error when starting spark shell but not often

2016-09-06 Thread Terry Hoo
Maybe not enough continues memory (10G?) in your host

Regards,
- Terry

On Wed, Sep 7, 2016 at 10:51 AM, Divya Gehlot 
wrote:

> Hi,
> I am using EMR 4.7 with Spark 1.6
> Sometimes when I start the spark shell I get below error
>
> OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0005662c,
>> 10632822784, 0) failed; error='Cannot allocate memory' (errno=12)
>> #
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>> # Native memory allocation (malloc) failed to allocate 10632822784 bytes
>> for committing reserved memory.
>> # An error report file with more information is saved as:
>> # /tmp/jvm-6066/hs_error.log
>
>
>
> Has any body encountered this kind of issue .
> Would really appreciate the resolution.
>
>
> Thanks,
> Divya
>


Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-09-06 Thread map reduced
Hi, anyone has any ideas please?

On Mon, Sep 5, 2016 at 8:30 PM, map reduced  wrote:

> Hi,
>
> I've written my custom metrics source/sink for my Spark streaming app and
> I am trying to initialize it from metrics.properties - but that doesn't
> work from executors. I don't have control on the machines in Spark cluster,
> so I can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have
> it in the fat jar where my app lives, but by the time my fat jar is
> downloaded on worker nodes in cluster, executors are already started and
> their Metrics system is already initialized - thus not picking my file with
> custom source configuration in it.
>
> Following this post
> ,
> I've specified 'spark.files
>  =
> metrics.properties' and 'spark.metrics.conf=metrics.properties' but by
> the time 'metrics.properties' is shipped to executors, their metric system
> is already initialized.
>
> If I initialize my own metrics system, it's picking up my file but then
> I'm missing master/executor level metrics/properties (eg.
> executor.sink.mySink.propName=myProp - can't read 'propName' from
> 'mySink') since they are initialized
> 
>  by
> Spark's metric system.
>
> Is there a (programmatic) way to have 'metrics.properties' shipped before
> executors initialize
> 
>  ?
>
> Here's my SO question
> 
> .
>
> Thanks,
>
> KP
>


Re: distribute work (files)

2016-09-06 Thread ayan guha
To access local file, try with file:// URI.

On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi 
wrote:

> This is a great question.  Basically you don't have to worry about the
> details-- just give a wildcard in your call to textFile.  See the Programming
> Guide  section
> entitled "External Datasets".  The Spark framework will distribute your
> data across the workers.  Note that:
>
> *If using a path on the local filesystem, the file must also be accessible
>> at the same path on worker nodes. Either copy the file to all workers or
>> use a network-mounted shared file system.*
>
>
> In your case this would mean the directory of files.
>
> Curiously, I cannot get this to work when I mount a directory with sshfs
> on all of my worker nodes.  It says "file not found" even though the file
> clearly exists in the specified path on all workers.   Anyone care to try
> and comment on this?
>
> Thanks,
>
> Pete
>
> On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
> wrote:
>
>> Hi,
>>
>> maybe this is a stupid question:
>>
>> I have a list of files. Each file I want to take as an input for a
>> ML-algorithm. All files are independent from another.
>> My question now is how do I distribute the work so that each worker takes
>> a block of files and just runs the algorithm on them one by one.
>> I hope somebody can point me in the right direction! :)
>>
>> Best regards,
>> Lydia
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


-- 
Best Regards,
Ayan Guha