Re: Apache Beam Spark runner

2016-03-19 Thread Jean-Baptiste Onofré

Hi Amit,

well done ;)

I'm reviewing it now (as I didn't have to do it before, sorry about that).

Regards
JB

On 03/17/2016 06:26 PM, Sela, Amit wrote:

Hi all,

The Apache Beam Spark runner is now available at:
https://github.com/apache/incubator-beam/tree/master/runners/spark Check
it out!
The Apache Beam (http://beam.incubator.apache.org/) project is a unified
model for building data pipelines using Google’s Dataflow programming
model, and now it supports Spark as well!

Take it for a ride on your Spark cluster!

Thanks,
Amit




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Extra libs in executor classpath

2016-03-19 Thread Silvio Fiorito
Could you publish it as a library (to an internal repo) then you can simply use 
the “--packages" option? Also will help with versioning as you make changes, 
that way you’re not having to manually ship JARs around to your machines and 
users.



From: Леонид Поляков
Sent: Wednesday, March 16, 2016 7:22 AM
To: user@spark.apache.org
Subject: Extra libs in executor classpath

Hello, guys!

I’ve been developing a kind of framework on top of spark, and my idea is to 
bundle the framework jars and some extra configs with the spark and pass it to 
other developers for their needs. So that devs can use this bundle and run 
usual spark stuff but with extra flavor that framework will add.

I’m trying to figure out how to properly set up the driver/executor classpath, 
so that framework classes are always loaded when you use the bundle.
I put framework libs in /lib folder right now, but will switch to something 
more specific later. I’m putting next spark-defaults.conf into my bundle:

spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*
spark.driver.extraClassPath lib/*

And this seem to work, but I want to get rid of the absolute path from 
spark.executor.extraClassPath and use something relative, or spark home 
somehow, since libs are right there under /lib
I’ve tried these settings for executor, and they do not work:
spark.executor.extraClassPath $SPARK_HOME/lib/*
spark.executor.extraClassPath lib/*

I’ve found out that work directory for started workers is like 
$SPARK_HOME/work/app-20160316070310-0002/0, so this works:
spark.executor.extraClassPath ../../../lib/*

But looks cheaty and not stable.

Could you help me with this issue? Maybe there are some placeholders that I can 
use in configs?
Let me know if you need any worker/master/driver logs

P.S. driver does not work if I am not in $SPARK_HOME when I execute  
spark-submit, e.g. if I do
cd bin
./spark-submit …
Then driver classpath is relative to /bin and now lib/* or ./lib/* in classpath 
does not work, so I need $SPARK_HOME for driver as well

Thanks, Leonid


Re: sql timestamp timezone bug

2016-03-19 Thread Andy Davidson
Hi Davies


> 
> What's the type of `created`? TimestampType?


The Œcreated¹ column in cassandra is a timestamp
https://docs.datastax.com/en/cql/3.0/cql/cql_reference/timestamp_type_r.html

In the spark data frame it is a a timestamp

> 
> If yes, when created is compared to a string, it will be casted into
> string, then compared as string, it become
> 
> cast(created, as string) > '2016-03-12 00:30:00+'
> 
> Could you try this
> 
> sqlCtx.sql("select created, cast(created as string) from rawTable").show()


I am note sure I under stand your suggestion. In my where clause the date
range is specified using string literals. I need the value of created to be
a time stamps

# http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html
stmnt = "select \
row_key, created,  cast(created as string), count,
unix_timestamp(created) as unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.zz') as aedwip, \
to_utc_timestamp(created, 'gmt') as gmt \
 from \
rawTable \
 where \
 (created > '{0}') and (created <= '{1}') \
 and \
 (row_key = Œred' \
or row_key = Œblue' )".format('2016-03-12
00:30:00+', '2016-03-12 04:30:00+')

rawDF = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="json_timeseries", keyspace="notification")\
.load() 
rawDF.registerTempTable(tmpTableName)
rawDF = sqlCtx.sql(stmnt).cache()


The time stamps are still not UTC they are in PST

root
 |-- row_key: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- created: string (nullable = true)
 |-- count: long (nullable = true)
 |-- unixTimeStamp: long (nullable = true)
 |-- aedwip: long (nullable = true)
 |-- gmt: timestamp (nullable = true)

+-+-+---+-+-
+--+-+
|row_key  |created  |created
|count|unixTimeStamp|aedwip|gmt  |
+-+-+---+-+-
+--+-+
|blue |2016-03-12 00:30:30.0|2016-03-12 00:30:30|2|1457771430
|1457771430|2016-03-12 00:30:30.0|
|blue |2016-03-12 00:30:45.0|2016-03-12 00:30:45|1|1457771445
|1457771445|2016-03-12 00:30:45.0|
|blue |2016-03-12 00:31:00.0|2016-03-12 00:31:00|1|1457771460
|1457771460|2016-03-12 00:31:00.0|
|

Kind regards

Andy




Re: sql timestamp timezone bug

2016-03-19 Thread Andy Davidson

For completeness. Clearly spark sql returned a different data set

In [4]:
rawDF.selectExpr("count(row_key) as num_samples",
"sum(count) as total",
"max(count) as max ").show()
+---++-+
|num_samples|total|max|
+---++-+
|   2037| 3867| 67|
+---++-+


From:  Andrew Davidson 
Date:  Thursday, March 17, 2016 at 3:02 PM
To:  "user @spark" 
Subject:  sql timestamp timezone bug

> I am using pyspark 1.6.0 and
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
> data
> 
> The data is originally captured by a spark streaming app and written to
> Cassandra. The value of the timestamp comes from
> 
> Rdd.foreachRDD(new VoidFunction2()
> �});
> 
> I am confident the time stamp is stored correctly in cassandra and that
> the clocks on the machines in my cluster are set correctly
> 
> I noticed that if I used Cassandra CQLSH to select a data set between two
> points in time the row count did not match the row count I got when I did
> the same select in spark using SQL, It appears the spark sql assumes all
> timestamp strings are in the local time zone.
> 
> 
> Here is what I expect. (this is what is returned by CQLSH)
> cqlsh> select
>... count(row_key) as num_samples, sum(count) as total, max(count)
> as max
>... from
>... notification.json_timeseries
>... where
>... row_key in (똱ed', 똟lue')
>... and created > '2016-03-12 00:30:00+'
>... and created <= '2016-03-12 04:30:00+'
>... allow filtering;
> 
>  num_samples | total| max
> -+--+---
> 3242 |11277 |  17
> 
> 
> Here is  my pyspark select statement. Notice the 똠reated column encodes
> the timezone¹. I am running this on my local mac (in PST timezone) and
> connecting to my data center (which runs on UTC) over a VPN.
> 
> rawDF = sqlContext.read\
> .format("org.apache.spark.sql.cassandra")\
> .options(table="json_timeseries", keyspace="notification")\
> .load() 
> 
> 
> rawDF.registerTempTable(tmpTableName)
> 
> 
> 
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
> to_utc_timestamp(created, 'gmt') as gmt \
> from \
> rawTable \
> where \
> (created > '{0}') and (created <= '{1}') \
> and \
> (row_key = 똱ed' or row_key = 똟lue¹) \
> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
> 
> rawDF = sqlCtx.sql(stmnt).cache()
> 
> 
> 
> 
> I get a different values for row count, max, �
> 
> If I convert the UTC time stamp string to my local timezone the row count
> matches the count returned by  cqlsh
> 
> # pst works, matches cassandra cqlsh
> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
> 
> Am I doing something wrong in my pyspark code?
> 
> 
> Kind regards
> 
> Andy
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: help coercing types

2016-03-19 Thread Jacek Laskowski
Hi,

Just a side question: why do you convert DataFrame to RDD? It's like
driving backwards (possible but ineffective and dangerous at times)

P. S. I'd even go for Dataset.

Jacek
18.03.2016 5:20 PM "Bauer, Robert"  napisał(a):

> I have data that I pull in using a sql context and then I convert to an
> rdd.
>
>
>
> The problem is that the type in the rdd is [Any, Iterable[Any]]
>
>
>
> And I need to have the type RDD[Array[String]]   -- convert the Iterable
> to an Array.
>
>
>
> Here’s more detail:
>
>
>
> val zdata = sqlContext.read.parquet("s3://.. parquet").select('Pk,
> explode('Pg) as "P").select($"Pk", $"P.A.n")
>
>
>
> val r1data = zdata.rdd
>
>
>
> val r2data = r1data.map(t => (t(0),t(1))).groupByKey()
>
>
>
> and at this point r2data’s type is [Any, Iterable[Any]]
>
>
>
> robert
>
>
>
> --
>
> This message (including any attachments) contains confidential and/or
> privileged information. It is intended for a specific individual and
> purpose and is protected by law. If you are not the intended recipient,
> please notify the sender immediately and delete this message. Any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, is strictly prohibited.
>


Re: [discuss] making SparkEnv private in Spark 2.0

2016-03-19 Thread Mridul Muralidharan
We use it in executors to get to :
a) spark conf (for getting to hadoop config in map doing custom
writing of side-files)
b) Shuffle manager (to get shuffle reader)

Not sure if there are alternative ways to get to these.

Regards,
Mridul

On Wed, Mar 16, 2016 at 2:52 PM, Reynold Xin  wrote:
> Any objections? Please articulate your use case. SparkEnv is a weird one
> because it was documented as "private" but not marked as so in class
> visibility.
>
>  * NOTE: This is not intended for external use. This is exposed for Shark
> and may be made private
>  *   in a future release.
>
>
> I do see Hive using it to get the config variable. That can probably be
> propagated through other means.
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Extra libs in executor classpath

2016-03-19 Thread Леонид Поляков
It makes no sense for worker, the issue is with executor classpath, not the
driver classpath.

Please, answer actual question that is not in "P.S." - that one it's just a
note about driver

Thanks, Leonid

On Wed, Mar 16, 2016 at 6:21 PM, Ted Yu  wrote:

> For your last point, spark-submit has:
>
> if [ -z "${SPARK_HOME}" ]; then
>   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
> fi
>
> Meaning the script would determine the proper SPARK_HOME variable.
>
> FYI
>
> On Wed, Mar 16, 2016 at 4:22 AM, Леонид Поляков  wrote:
>
>> Hello, guys!
>>
>>
>>
>> I’ve been developing a kind of framework on top of spark, and my idea is
>> to bundle the framework jars and some extra configs with the spark and pass
>> it to other developers for their needs. So that devs can use this bundle
>> and run usual spark stuff but with extra flavor that framework will add.
>>
>>
>>
>> I’m trying to figure out how to properly set up the driver/executor
>> classpath, so that framework classes are always loaded when you use the
>> bundle.
>>
>> I put framework libs in /lib folder right now, but will switch to
>> something more specific later. I’m putting next spark-defaults.conf into
>> my bundle:
>>
>>
>>
>> spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*
>>
>> spark.driver.extraClassPath lib/*
>>
>>
>>
>> And this seem to work, but I want to get rid of the absolute path from
>> spark.executor.extraClassPath and use something relative, or spark home
>> somehow, since libs are right there under /lib
>>
>> I’ve tried these settings for executor, and they do not work:
>>
>> spark.executor.extraClassPath $SPARK_HOME/lib/*
>>
>> spark.executor.extraClassPath lib/*
>>
>>
>>
>> I’ve found out that work directory for started workers is like
>> $SPARK_HOME/work/app-20160316070310-0002/0, so this works:
>>
>> spark.executor.extraClassPath ../../../lib/*
>>
>>
>>
>> But looks cheaty and not stable.
>>
>>
>>
>> Could you help me with this issue? Maybe there are some placeholders that
>> I can use in configs?
>>
>> Let me know if you need any worker/master/driver logs
>>
>>
>>
>> P.S. driver does not work if I am not in $SPARK_HOME when I execute
>> spark-submit, e.g. if I do
>>
>> cd bin
>>
>> ./spark-submit …
>>
>> Then driver classpath is relative to /bin and now lib/* or ./lib/* in
>> classpath does not work, so I need $SPARK_HOME for driver as well
>>
>>
>> Thanks, Leonid
>>
>
>


Re: The build-in indexes in ORC file does not work.

2016-03-19 Thread Mich Talebzadeh
I did some tests on Hive running on MR to get rid of Spark effects.

In an ORC table that has been partitioned, partition elimination with
predicate push down works and the query is narrowed to the partition
itself. I can see that from the number of rows within that partition.

For example below sales table is ORC partitioned by year and month. For
year = 1999 and month = 8 , there are  124,284 rows

explain extended select count(1) from sales where year = 1999 and month =8;
Map 1
Map Operator Tree:
TableScan
  alias: sales
  Statistics: *Num rows: 124284* Data size: 1184497 Basic
stats: COMPLETE Column stats: NONE
  GatherStats: false
  Select Operator
Statistics: Num rows: 124284 Data size: 1184497 Basic
stats: COMPLETE Column stats: NONE
Group By Operator

That is the only time I have seen through explain plan that partition
elimination is working.


HTH






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 16 March 2016 at 20:57, Wietsma, Tristan A. <
tristan.wiet...@capitalone.com> wrote:

> Regarding bloom filters,
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-12417
>
>
>
> Sent with Good (www.good.com)
> --
> *From:* Joseph 
> *Sent:* Wednesday, March 16, 2016 9:46:25 AM
> *To:* user
> *Cc:* user; user
> *Subject:* Re: Re: The build-in indexes in ORC file does not work.
>
>
> terminal_type =0,  260,000,000 rows,  almost cover half of the whole data.
>
> terminal_type =25066, just 3800 rows.
>
>
>
> orc 
> tblproperties("orc.compress"="SNAPPY","orc.compress.size"="262141","orc.stripe.size"="268435456","orc.row.index.stride"="
> *10*","orc.create.index"="true","orc.bloom.filter.columns"="");
>
>
> The table "gprs" has sorted by terminal_type.  Before sort, I have
> another table named "gprs_orc", I use sparkSQL to sort the data as follows:
>
> (before do this, I set  hive.enforce.sorting=true)
>
> sql> INSERT INTO TABLE gprs SELECT * FROM gprs_orc sort by terminal_type ;
>
> Because the table gprs_orc has 800 files, so generate 800 Tasks, and
> create 800 files also in table gprs. But I am not sure whether each file be
> sorted separately or not.
>
>
> I have tried  bloom filter ,but it makes no improvement。I know about tez,
> but never use, I will try it later.
>
>
> The following is my test in hive 1.2.1:
>
> 1. enable *hive.optimize.index.filter* and *hive.optimize.ppd:*
>
> select count(*) from gprs where terminal_type=25080;will not scan
> data   Time taken: 353.345 seconds
>
> select count(*) from gprs where terminal_type=25066;just scan a
> few row groupsTime taken:  354.860 seconds
>
> select count(*) from gprs where terminal_type=0;scan half
> of the data  Time taken:  378.312 seconds
>
>
> 2. *disable *hive.optimize.index.filter and hive.optimize.ppd:
>
>   select count(*) from gprs where terminal_type=25080;   scan all the
> data  Time taken: 389.700 seconds
>
> select count(*) from gprs where terminal_type=25066;   scan all the
> data  Time taken:  386.600 seconds
>
> select count(*) from gprs where terminal_type=0;scan all
> the data Time taken:  395.968 seconds
>
>
> The following is my environment:
>   3 nodes,12 cpu cores per node,48G memory free per node,   4
> disks per node,  3 replications per block , hadoop 2.7.2,hive 1.2.1
>
>
> --
> Joseph
>
>
> *From:* Jörn Franke 
> *Date:* 2016-03-16 20:27
> *To:* Joseph 
> *CC:* user ; user 
> *Subject:* Re: The build-in indexes in ORC file does not work.
> Not sure it should work. How many rows are affected? The data is sorted?
> Have you tried with Tez? Tez has some summary statistics that tells you if
> you use push down. Maybe you need to use HiveContext.
> Perhaps a bloom filter could make sense for you as well.
>
> On 16 Mar 2016, at 12:45, Joseph  wrote:
>
> Hi,
>
> I have only one table named "gprs",  it has 560,000,000 rows,  and 57
> columns.  The block size is 256M,  total ORC file number is 800, each of
> them is about 51M.
>
> my query statement is :
> select count(*) from gprs  where  terminal_type = 25080;
> select * from gprs  where  terminal_type = 25080;
>
> In the gprs table, the "terminal_type"  column's  value is in [0, 25066]
>
> --
> Joseph
>
>
> *From:* Jörn Franke 
> *Date:* 2016-03-16 19:26
> *To:* Joseph 
> *CC:* user ; user 

Re: The error to read HDFS custom file in spark.

2016-03-19 Thread Benyi Wang
I would say change

class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat

to

class RawDataInputFormat[LongWritable, RDRawDataRecord] extends FileInputFormat

​

On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh 
wrote:

> Hi Tony,
>
> Is
>
> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>
> One of your own packages?
>
> Sounds like it is one throwing the error
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 March 2016 at 15:21, Tony Liu  wrote:
>
>> Hi,
>>My HDFS file is store with custom data structures. I want to read it
>> with SparkContext object.So I define a formatting object:
>>
>> *1. code of RawDataInputFormat.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/16/16.
>>   */
>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> FileInputFormat {
>>
>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>> Reporter): RecordReader[LW, RD] = {
>> new RawReader(split, job, reporter)
>>   }
>>
>> }
>>
>> *2. code of RawReader.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/17/16.
>>   */
>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> RecordReader[LW, RD] {
>>
>>   var reader: SequenceFile.Reader = null
>>   var currentPos: Long = 0L
>>   var length: Long = 0L
>>
>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>> this()
>> val p = (split.asInstanceOf[FileSplit]).getPath
>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>   }
>>
>>   override def next(key: LW, value: RD): Boolean = {
>> val flag = reader.next(key, value)
>> currentPos = reader.getPosition()
>> flag
>>   }
>>
>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>> length.toFloat)
>>
>>   override def getPos: Long = currentPos
>>
>>   override def createKey(): LongWritable = {
>> new LongWritable()
>>   }
>>
>>   override def close(): Unit = {
>> reader.close()
>>   }
>>
>>   override def createValue(): RDRawDataRecord = {
>> new RDRawDataRecord()
>>   }
>> }
>>
>> *3. code of RDRawDataRecord.scala*
>>
>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import org.apache.commons.lang.StringUtils;
>> import org.apache.hadoop.io.Writable;
>>
>> public class RDRawDataRecord implements Writable {
>> private String smac;
>> private String dmac;
>> private int hrssi;
>> private int lrssi;
>> private long fstamp;
>> private long lstamp;
>> private long maxstamp;
>> private long minstamp;
>> private long stamp;
>>
>> public void readFields(DataInput in) throws IOException {
>> this.smac = in.readUTF();
>> this.dmac = in.readUTF();
>> this.hrssi = in.readInt();
>> this.lrssi = in.readInt();
>> this.fstamp = in.readLong();
>> this.lstamp = in.readLong();
>> this.maxstamp = in.readLong();
>> this.minstamp = in.readLong();
>> this.stamp = in.readLong();
>> }
>>
>> public void write(DataOutput out) throws IOException {
>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>> out.writeInt(this.hrssi);
>> out.writeInt(this.lrssi);
>> out.writeLong(this.fstamp);
>> out.writeLong(this.lstamp);
>> out.writeLong(this.maxstamp);
>> out.writeLong(this.minstamp);
>> out.writeLong(this.stamp);
>> }
>>
>> */** *
>>
>> *ignore getter setter*
>>
>> ***/*
>>
>> }
>>
>> *At last, I use this code to run*:
>>
>> val filePath = 
>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>> val conf = new SparkConf()
>> conf.setMaster("local")
>> conf.setAppName("demo")
>> val sc = new SparkContext(conf)
>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>> file.foreach(v => {
>>   println(v._2.getDmac) // Attribute of custom objects
>> })
>>
>> *I get an error, it says:*
>>
>> Error:(41, 19) type arguments 
>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>  conform to the 

Re: Extra libs in executor classpath

2016-03-19 Thread Ted Yu
For your last point, spark-submit has:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

Meaning the script would determine the proper SPARK_HOME variable.

FYI

On Wed, Mar 16, 2016 at 4:22 AM, Леонид Поляков  wrote:

> Hello, guys!
>
>
>
> I’ve been developing a kind of framework on top of spark, and my idea is
> to bundle the framework jars and some extra configs with the spark and pass
> it to other developers for their needs. So that devs can use this bundle
> and run usual spark stuff but with extra flavor that framework will add.
>
>
>
> I’m trying to figure out how to properly set up the driver/executor
> classpath, so that framework classes are always loaded when you use the
> bundle.
>
> I put framework libs in /lib folder right now, but will switch to
> something more specific later. I’m putting next spark-defaults.conf into
> my bundle:
>
>
>
> spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*
>
> spark.driver.extraClassPath lib/*
>
>
>
> And this seem to work, but I want to get rid of the absolute path from
> spark.executor.extraClassPath and use something relative, or spark home
> somehow, since libs are right there under /lib
>
> I’ve tried these settings for executor, and they do not work:
>
> spark.executor.extraClassPath $SPARK_HOME/lib/*
>
> spark.executor.extraClassPath lib/*
>
>
>
> I’ve found out that work directory for started workers is like
> $SPARK_HOME/work/app-20160316070310-0002/0, so this works:
>
> spark.executor.extraClassPath ../../../lib/*
>
>
>
> But looks cheaty and not stable.
>
>
>
> Could you help me with this issue? Maybe there are some placeholders that
> I can use in configs?
>
> Let me know if you need any worker/master/driver logs
>
>
>
> P.S. driver does not work if I am not in $SPARK_HOME when I execute
> spark-submit, e.g. if I do
>
> cd bin
>
> ./spark-submit …
>
> Then driver classpath is relative to /bin and now lib/* or ./lib/* in
> classpath does not work, so I need $SPARK_HOME for driver as well
>
>
> Thanks, Leonid
>


Re: installing packages with pyspark

2016-03-19 Thread Felix Cheung
For some, like graphframes that are Spark packages, you could also use 
--packages in the command line of spark-submit or pyspark. 
Seehttp://spark.apache.org/docs/latest/submitting-applications.html

_
From: Jakob Odersky 
Sent: Thursday, March 17, 2016 6:40 PM
Subject: Re: installing packages with pyspark
To: Ajinkya Kale 
Cc:  


   Hi,   
 regarding 1, packages are resolved locally. That means that when you   
 specify a package, spark-submit will resolve the dependencies and   
 download any jars on the local machine, before shipping* them to the   
 cluster. So, without a priori knowledge of dataproc clusters, it   
 should be no different to specify packages.   

 Unfortunatly I can't help with 2.   

 --Jakob   

 *shipping in this case means making them available via the network   

 On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale  wrote:   
 > Hi all,   
 >   
 > I had couple of questions.   
 > 1. Is there documentation on how to add the graphframes or any other package 
 >   
 > for that matter on the google dataproc managed spark clusters ?   
 >   
 > 2. Is there a way to add a package to an existing pyspark context through a  
 >  
 > jupyter notebook ?   
 >   
 > --aj   

 -   
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org   
 For additional commands, e-mail: user-h...@spark.apache.org   

   


  

help coercing types

2016-03-19 Thread Bauer, Robert
I have data that I pull in using a sql context and then I convert to an rdd.

The problem is that the type in the rdd is [Any, Iterable[Any]]

And I need to have the type RDD[Array[String]]   -- convert the Iterable to an 
Array.

Here’s more detail:

val zdata = sqlContext.read.parquet("s3://.. parquet").select('Pk, explode('Pg) 
as "P").select($"Pk", $"P.A.n")

val r1data = zdata.rdd

val r2data = r1data.map(t => (t(0),t(1))).groupByKey()

and at this point r2data’s type is [Any, Iterable[Any]]

robert




This message (including any attachments) contains confidential and/or 
privileged information. It is intended for a specific individual and purpose 
and is protected by law. If you are not the intended recipient, please notify 
the sender immediately and delete this message. Any disclosure, copying, or 
distribution of this message, or the taking of any action based on it, is 
strictly prohibited.


Re: [Spark-1.5.2]Column renaming with withColumnRenamed has no effect

2016-03-19 Thread Ted Yu
Can you give a bit more detail ?

Release of Spark
symptom of renamed column being not recognized

Please take a look at "withColumnRenamed" test in:

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

On Thu, Mar 17, 2016 at 2:02 AM, Divya Gehlot 
wrote:

> Hi,
> I am adding a new column and renaming it at same time but the renaming
> doesnt have any effect.
>
> dffiltered =
>> dffiltered.unionAll(dfresult.withColumn("Col1",lit("value1").withColumn("Col2",lit("value2")).cast("int")).withColumn("Col3",lit("values3")).withColumnRenamed("Col1","Col1Rename").drop("Col1")
>
>
> Can anybody help me pointing out my mistake ?
>
> Thanks,
> Divya
>


Re: df.dtypes -> pyspark.sql.types

2016-03-19 Thread Ruslan Dautkhanov
Spark 1.5 is the latest that I have access to and where this problem
happens.

I don't see it's fixed in master but I might be wrong. diff atatched.

https://raw.githubusercontent.com/apache/spark/branch-1.5/python/pyspark/sql/types.py
https://raw.githubusercontent.com/apache/spark/d57daf1f7732a7ac54a91fe112deeda0a254f9ef/python/pyspark/sql/types.py



-- 
Ruslan Dautkhanov

On Wed, Mar 16, 2016 at 4:44 PM, Reynold Xin  wrote:

> We probably should have the alias. Is this still a problem on master
> branch?
>
> On Wed, Mar 16, 2016 at 9:40 AM, Ruslan Dautkhanov 
> wrote:
>
>> Running following:
>>
>> #fix schema for gaid which should not be Double
>>> from pyspark.sql.types import *
>>> customSchema = StructType()
>>> for (col,typ) in tsp_orig.dtypes:
>>> if col=='Agility_GAID':
>>> typ='string'
>>> customSchema.add(col,typ,True)
>>
>>
>> Getting
>>
>>   ValueError: Could not parse datatype: bigint
>>
>>
>> Looks like pyspark.sql.types doesn't know anything about bigint..
>> Should it be aliased to LongType in pyspark.sql.types?
>>
>> Thanks
>>
>>
>> On Wed, Mar 16, 2016 at 10:18 AM, Ruslan Dautkhanov > > wrote:
>>
>>> Hello,
>>>
>>> Looking at
>>>
>>> https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/sql/types.html
>>>
>>> and can't wrap my head around how to convert string data types names to
>>> actual
>>> pyspark.sql.types data types?
>>>
>>> Does pyspark.sql.types has an interface to return StringType() for
>>> "string",
>>> IntegerType() for "integer" etc? If it doesn't exist it would be great
>>> to have such a
>>> mapping function.
>>>
>>> Thank you.
>>>
>>>
>>> ps. I have a data frame, and use its dtypes to loop through all columns
>>> to fix a few
>>> columns' data types as a workaround for SPARK-13866.
>>>
>>>
>>> --
>>> Ruslan Dautkhanov
>>>
>>
>>
>
683a684,806
> _FIXED_DECIMAL = re.compile("decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)")
> 
> 
> _BRACKETS = {'(': ')', '[': ']', '{': '}'}
> 
> 
> def _parse_basic_datatype_string(s):
> if s in _all_atomic_types.keys():
> return _all_atomic_types[s]()
> elif s == "int":
> return IntegerType()
> elif _FIXED_DECIMAL.match(s):
> m = _FIXED_DECIMAL.match(s)
> return DecimalType(int(m.group(1)), int(m.group(2)))
> else:
> raise ValueError("Could not parse datatype: %s" % s)
> 
> 
> def _ignore_brackets_split(s, separator):
> """
> Splits the given string by given separator, but ignore separators inside 
> brackets pairs, e.g.
> given "a,b" and separator ",", it will return ["a", "b"], but given 
> "a, d", it will return
> ["a", "d"].
> """
> parts = []
> buf = ""
> level = 0
> for c in s:
> if c in _BRACKETS.keys():
> level += 1
> buf += c
> elif c in _BRACKETS.values():
> if level == 0:
> raise ValueError("Brackets are not correctly paired: %s" % s)
> level -= 1
> buf += c
> elif c == separator and level > 0:
> buf += c
> elif c == separator:
> parts.append(buf)
> buf = ""
> else:
> buf += c
> 
> if len(buf) == 0:
> raise ValueError("The %s cannot be the last char: %s" % (separator, 
> s))
> parts.append(buf)
> return parts
> 
> 
> def _parse_struct_fields_string(s):
> parts = _ignore_brackets_split(s, ",")
> fields = []
> for part in parts:
> name_and_type = _ignore_brackets_split(part, ":")
> if len(name_and_type) != 2:
> raise ValueError("The strcut field string format is: 
> 'field_name:field_type', " +
>  "but got: %s" % part)
> field_name = name_and_type[0].strip()
> field_type = _parse_datatype_string(name_and_type[1])
> fields.append(StructField(field_name, field_type))
> return StructType(fields)
> 
> 
> def _parse_datatype_string(s):
> """
> Parses the given data type string to a :class:`DataType`. The data type 
> string format equals
> to `DataType.simpleString`, except that top level struct type can omit 
> the `struct<>` and
> atomic types use `typeName()` as their format, e.g. use `byte` instead of 
> `tinyint` for
> ByteType. We can also use `int` as a short name for IntegerType.
> 
> >>> _parse_datatype_string("int ")
> IntegerType
> >>> _parse_datatype_string("a: byte, b: decimal(  16 , 8   ) ")
> 
> StructType(List(StructField(a,ByteType,true),StructField(b,DecimalType(16,8),true)))
> >>> _parse_datatype_string("a: array< short>")
> StructType(List(StructField(a,ArrayType(ShortType,true),true)))
> >>> _parse_datatype_string(" map ")
> MapType(StringType,StringType,true)
> 
> >>> # Error cases
> >>> _parse_datatype_string("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
> Traceback (most recent call last):

Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-03-19 Thread craigiggy
Slight update I suppose?
For some reason, sometimes it will connect and continue and the job will be
completed. But most of the time I still run into this error and the job is
killed and the application doesn't finish.

Still have no idea why this is happening. I could really use some help here.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26531.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: installing packages with pyspark

2016-03-19 Thread Franc Carter
Thanks - I'll give that a try

cheers

On 20 March 2016 at 09:42, Felix Cheung  wrote:

> You are running pyspark in Spark client deploy mode. I have ran into the
> same error as well and I'm not sure if this is graphframes specific - the
> python process can't find the graphframes Python code when it is loaded as
> a Spark package.
>
> To workaround this, I extract the graphframes Python directory locally
> where I run pyspark into a directory called graphframes.
>
>
>
>
>
>
> On Thu, Mar 17, 2016 at 10:11 PM -0700, "Franc Carter" <
> franc.car...@gmail.com> wrote:
>
>
> I'm having trouble with that for pyspark, yarn and graphframes. I'm using:-
>
> pyspark --master yarn --packages graphframes:graphframes:0.1.0-spark1.5
>
> which starts and gives me a REPL, but when I try
>
>from graphframes import *
>
> I get
>
>   No module names graphframes
>
> without '--master yarn' it works as expected
>
> thanks
>
>
> On 18 March 2016 at 12:59, Felix Cheung  wrote:
>
> For some, like graphframes that are Spark packages, you could also use
> --packages in the command line of spark-submit or pyspark. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> _
> From: Jakob Odersky 
> Sent: Thursday, March 17, 2016 6:40 PM
> Subject: Re: installing packages with pyspark
> To: Ajinkya Kale 
> Cc: 
>
>
>
> Hi,
> regarding 1, packages are resolved locally. That means that when you
> specify a package, spark-submit will resolve the dependencies and
> download any jars on the local machine, before shipping* them to the
> cluster. So, without a priori knowledge of dataproc clusters, it
> should be no different to specify packages.
>
> Unfortunatly I can't help with 2.
>
> --Jakob
>
> *shipping in this case means making them available via the network
>
> On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale 
> wrote:
> > Hi all,
> >
> > I had couple of questions.
> > 1. Is there documentation on how to add the graphframes or any other
> package
> > for that matter on the google dataproc managed spark clusters ?
> >
> > 2. Is there a way to add a package to an existing pyspark context
> through a
> > jupyter notebook ?
> >
> > --aj
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
>
> --
> Franc
>



-- 
Franc


Re: df.dtypes -> pyspark.sql.types

2016-03-19 Thread Ruslan Dautkhanov
Running following:

#fix schema for gaid which should not be Double
> from pyspark.sql.types import *
> customSchema = StructType()
> for (col,typ) in tsp_orig.dtypes:
> if col=='Agility_GAID':
> typ='string'
> customSchema.add(col,typ,True)


Getting

  ValueError: Could not parse datatype: bigint


Looks like pyspark.sql.types doesn't know anything about bigint..
Should it be aliased to LongType in pyspark.sql.types?

Thanks


On Wed, Mar 16, 2016 at 10:18 AM, Ruslan Dautkhanov 
wrote:

> Hello,
>
> Looking at
>
> https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/sql/types.html
>
> and can't wrap my head around how to convert string data types names to
> actual
> pyspark.sql.types data types?
>
> Does pyspark.sql.types has an interface to return StringType() for
> "string",
> IntegerType() for "integer" etc? If it doesn't exist it would be great to
> have such a
> mapping function.
>
> Thank you.
>
>
> ps. I have a data frame, and use its dtypes to loop through all columns to
> fix a few
> columns' data types as a workaround for SPARK-13866.
>
>
> --
> Ruslan Dautkhanov
>


Re: installing packages with pyspark

2016-03-19 Thread Felix Cheung
You are running pyspark in Spark client deploy mode. I have ran into the same 
error as well and I'm not sure if this is graphframes specific - the python 
process can't find the graphframes Python code when it is loaded as a Spark 
package.
To workaround this, I extract the graphframes Python directory locally where I 
run pyspark into a directory called graphframes.






On Thu, Mar 17, 2016 at 10:11 PM -0700, "Franc Carter"  
wrote:





I'm having trouble with that for pyspark, yarn and graphframes. I'm using:-

pyspark --master yarn --packages graphframes:graphframes:0.1.0-spark1.5

which starts and gives me a REPL, but when I try

   from graphframes import *

I get

  No module names graphframes

without '--master yarn' it works as expected

thanks


On 18 March 2016 at 12:59, Felix Cheung  wrote:

> For some, like graphframes that are Spark packages, you could also use
> --packages in the command line of spark-submit or pyspark. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> _
> From: Jakob Odersky 
> Sent: Thursday, March 17, 2016 6:40 PM
> Subject: Re: installing packages with pyspark
> To: Ajinkya Kale 
> Cc: 
>
>
>
> Hi,
> regarding 1, packages are resolved locally. That means that when you
> specify a package, spark-submit will resolve the dependencies and
> download any jars on the local machine, before shipping* them to the
> cluster. So, without a priori knowledge of dataproc clusters, it
> should be no different to specify packages.
>
> Unfortunatly I can't help with 2.
>
> --Jakob
>
> *shipping in this case means making them available via the network
>
> On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale 
> wrote:
> > Hi all,
> >
> > I had couple of questions.
> > 1. Is there documentation on how to add the graphframes or any other
> package
> > for that matter on the google dataproc managed spark clusters ?
> >
> > 2. Is there a way to add a package to an existing pyspark context
> through a
> > jupyter notebook ?
> >
> > --aj
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


--
Franc


Re: Spark 2.0 Shell -csv package weirdness

2016-03-19 Thread Mich Talebzadeh
Hi Vince,

We had a similar case a while back. I tried two solutions in both Spark on
Hive metastore and Hive on Spark engine.

Hive version 2
Spark as Hive engine 1.3.1

Basically

--1 Move .CSV data into HDFS:
--2 Create an external table (all columns as string)
--3 Create the ORC table (majority Int)
--4 Insert the data from the external table to the Hive ORC table
compressed as zlib

ORC seems to be in this case a good candidate as a simple insert/select
from external table to ORC takes no time. I bucketed ORC table and marked
it as transactional in case one needs to make a correction to it (not
really needed).

The whole process was time stamped and it took 5 minutes to complete and
there were 7,009,728 rows in total.


+-+--+
|starttime|
+-+--+
| 19/03/2016 22:21:19.19  |
+-+--+

+-+--+
| endtime |
+-+--+
| 19/03/2016 22:26:12.12  |
+-+--+



This is the code. I will try spark code later with parquet

select from_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') AS
StartTime;
set hive.exec.reducers.max=256;
use test;
--set hive.execution.engine=mr;
--2)
DROP TABLE IF EXISTS stg_t2;
CREATE EXTERNAL TABLE stg_t2 (
   Year string
,  Monthstring
,  DayofMonth   string
,  DayOfWeekstring
,  DepTime  string
,  CRSDepTime   string
,  ArrTime  string
,  CRSArrTime   string
,  UniqueCarrierstring
,  FlightNumstring
,  TailNum  string
,  ActualElapsedTimestring
,  CRSElapsedTime   string
,  AirTime  string
,  ArrDelay string
,  DepDelay string
,  Origin   string
,  Dest string
,  Distance string
,  TaxiIn   string
,  TaxiOut  string
,  Cancelledstring
,  CancellationCode string
,  Diverted string
,  CarrierDelay string
,  WeatherDelay string
,  NASDelay string
,  SecurityDelaystring
,  LateAircraftDelaystring
)
COMMENT 'from csv file from
http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs://rhes564:9000/data/stg2'
TBLPROPERTIES ("skip.header.line.count"="1")
;
--3)
DROP TABLE IF EXISTS t2008;
CREATE TABLE t2008 (
   Year int
,  Monthint
,  DayofMonth   int
,  DayOfWeekint
,  DepTime  string
,  CRSDepTime   string
,  ArrTime  string
,  CRSArrTime   string
,  UniqueCarrierstring
,  FlightNumint
,  TailNum  int
,  ActualElapsedTimeint
,  CRSElapsedTime   int
,  AirTime  int
,  ArrDelay int
,  DepDelay int
,  Origin   string
,  Dest string
,  Distance int
,  TaxiIn   int
,  TaxiOut  int
,  Cancelledstring
,  CancellationCode string
,  Diverted string
,  CarrierDelay int
,  WeatherDelay int
,  NASDelay int
,  SecurityDelayint
,  LateAircraftDelayint
)
COMMENT 'from csv file from
http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB",
"transactional"="true")
;
--4) Put data in target table. do the conversion and ignore empty rows
INSERT INTO TABLE t2008
SELECT
  *
FROM
stg_t2
;
--select count(1) from t2008
;
select from_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') AS EndTime;
!exit

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 19 March 2016 at 19:18, Vincent Ohprecio  wrote:

>
> For some reason writing data from Spark shell to csv using the `csv
> package` takes almost an hour to dump to disk. Am I going crazy or did I do
> this wrong? I tried writing to parquet first and its fast as normal.
>
> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes
> crazy and it sounds like its taking off like a plane ... lol
>
> Here is the code if anyone wants to experiment:
>
> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>
> //
>
> // version 2.0.0-SNAPSHOT
>
> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.7.0_80)
>
> // http://stat-computing.org/dataexpo/2009/the-data.html
>
>
> def time[R](block: => R): R = {
>
> val t0 = System.nanoTime()
>
> val result = block// call-by-name
>
> val t1 = System.nanoTime()
>
> println("Elapsed time: " + (t1 - t0) + "ns")
>
> result
>
> }
>
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("/Users/employee/Downloads/2008.csv")
>
> val df_1 = df.withColumnRenamed("Year","oldYear")
>
> val df_2 =
> 

sql timestamp timezone bug

2016-03-19 Thread Andy Davidson
I am using pyspark 1.6.0 and
datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
data

The data is originally captured by a spark streaming app and written to
Cassandra. The value of the timestamp comes from

Rdd.foreachRDD(new VoidFunction2()
Š});

I am confident the time stamp is stored correctly in cassandra and that
the clocks on the machines in my cluster are set correctly

I noticed that if I used Cassandra CQLSH to select a data set between two
points in time the row count did not match the row count I got when I did
the same select in spark using SQL, It appears the spark sql assumes all
timestamp strings are in the local time zone.


Here is what I expect. (this is what is returned by CQLSH)
cqlsh> select
   ... count(row_key) as num_samples, sum(count) as total, max(count)
as max
   ... from
   ... notification.json_timeseries
   ... where
   ... row_key in (Œred', Œblue')
   ... and created > '2016-03-12 00:30:00+'
   ... and created <= '2016-03-12 04:30:00+'
   ... allow filtering;

 num_samples | total| max
-+--+---
3242 |11277 |  17


Here is  my pyspark select statement. Notice the Œcreated column encodes
the timezone¹. I am running this on my local mac (in PST timezone) and
connecting to my data center (which runs on UTC) over a VPN.

rawDF = sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="json_timeseries", keyspace="notification")\
.load() 


rawDF.registerTempTable(tmpTableName)



stmnt = "select \
row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
to_utc_timestamp(created, 'gmt') as gmt \
from \
rawTable \
where \
(created > '{0}') and (created <= '{1}') \
and \
(row_key = Œred' or row_key = Œblue¹) \
)".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')

rawDF = sqlCtx.sql(stmnt).cache()




I get a different values for row count, max, Š

If I convert the UTC time stamp string to my local timezone the row count
matches the count returned by  cqlsh

# pst works, matches cassandra cqlsh
# .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')

Am I doing something wrong in my pyspark code?


Kind regards

Andy



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DistributedLDAModel missing APIs in org.apache.spark.ml

2016-03-19 Thread cindymc
I like using the new DataFrame APIs on Spark ML, compared to using RDDs in
the older SparkMLlib.  But it seems some of the older APIs are missing.  In
particular, '*.mllib.clustering.DistributedLDAModel' had two APIs that I
need now:

topDocumentsPerTopic
topTopicsPerDocument

How can I get at the same results using the APIs on
'*.ml.clustering.DistributedLDAModel'?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DistributedLDAModel-missing-APIs-in-org-apache-spark-ml-tp26535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ClassNotFoundException in RDD.map

2016-03-19 Thread Dirceu Semighini Filho
Hi Ted, thanks for answering.
The map is just that, whenever I try inside the map it throws this
ClassNotFoundException, even if I do map(f => f) it throws the exception.
What is bothering me is that when I do a take or a first it returns the
result, which make me conclude that the previous code isn't wrong.

Kind Regards,
Dirceu

2016-03-17 12:50 GMT-03:00 Ted Yu :

> bq. $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>
> Do you mind showing more of your code involving the map() ?
>
> On Thu, Mar 17, 2016 at 8:32 AM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hello,
>> I found a strange behavior after executing a prediction with MLIB.
>> My code return an RDD[(Any,Double)] where Any is the id of my dataset,
>> which is BigDecimal, and Double is the prediction for that line.
>> When I run
>> myRdd.take(10) it returns ok
>> res16: Array[_ >: (Double, Double) <: (Any, Double)] =
>> Array((1921821857196754403.00,0.1690292052496703),
>> (454575632374427.00,0.16902820241892452),
>> (989198096568001939.00,0.16903432789699502),
>> (14284129652106187990.00,0.16903517653451386),
>> (17980228074225252497.00,0.16903151028332508),
>> (3861345958263692781.00,0.16903056986183976),
>> (17558198701997383205.00,0.1690295450319745),
>> (10651576092054552310.00,0.1690286445174418),
>> (4534494349035056215.00,0.16903303401862327),
>> (5551671513234217935.00,0.16902303368995966))
>> But when I try to run some map on it:
>> myRdd.map(_._1).take(10)
>> It throws a ClassCastException:
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 72.0 (TID 1774, 172.31.23.208): java.lang.ClassNotFoundException:
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:278)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at 

Re: Can't zip RDDs with unequal numbers of partitions

2016-03-19 Thread Jiří Syrový
Unfortunately I can't share any snippet quickly as the code is generated,
but for now at least can share the plan. (See it here -
http://pastebin.dqd.cz/RAhm/)

After I've increased spark.sql.autoBroadcastJoinThreshold to 30 from
10 it went through without any problems. With 10 it was always
failing during the "planning" phase with the Exception above.

2016-03-17 22:05 GMT+01:00 Jakob Odersky :

> Can you share a snippet that reproduces the error? What was
> spark.sql.autoBroadcastJoinThreshold before your last change?
>
> On Thu, Mar 17, 2016 at 10:03 AM, Jiří Syrový 
> wrote:
> > Hi,
> >
> > any idea what could be causing this issue? It started appearing after
> > changing parameter
> >
> > spark.sql.autoBroadcastJoinThreshold to 10
> >
> >
> > Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with
> unequal
> > numbers of partitions
> > at
> >
> org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> > org.apache.spark.rdd.PartitionCoalescer.(CoalescedRDD.scala:172)
> > at
> > org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:85)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at
> >
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> > at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
> > at
> >
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
> > at
> >
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
> > at
> >
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
> > at
> >
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> > ... 28 more
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Error in "java.io.IOException: No input paths specified in job"

2016-03-19 Thread Sun, Rui
It complains about the file path  "./examples/src/main/resources/people.json"
You can try to use absolute path instead of relative path, and make sure the 
absolute path is correct.
If that still does not work, you can prefix the path with "file://" in case the 
default file schema for Hadoop is HDFS.

-Original Message-
From: tinyocean [mailto:haiyiz...@gmail.com] 
Sent: Thursday, March 17, 2016 9:22 PM
To: user@spark.apache.org
Subject: Error in "java.io.IOException: No input paths specified in job"

Hello,

I am learning sparkR by myself and have little computer background.

I am following the examples on
http://spark.apache.org/docs/latest/sparkr.html
and running 

/sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

people <- read.df(sqlContext, "./examples/src/main/resources/people.json",
"json")
head(people)/

But got
/Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.io.IOException: No input paths specified in job
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfu/

Could you please tell me where I got it wrong and how to fix it?
Thanks.

Regards,
Amy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-java-io-IOException-No-input-paths-specified-in-job-tp26528.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stop spark application when the job is complete.

2016-03-19 Thread Ted Yu
Can you call sc.stop() after indexing into elastic search ?

> On Mar 16, 2016, at 9:17 PM, Imre Nagi  wrote:
> 
> Hi,
> 
> I have a spark application for batch processing in standalone cluster. The 
> job is to query the database and then do some transformation, aggregation, 
> and several actions such as indexing the result into the elasticsearch.
> 
> If I dont call the sc.stop(), the spark application wont stop and take will 
> keep the resource used by the application. In the other hand, if I call the 
> sc.stop(), the spark app will be stopped before it query the database and do 
> further processing.
> 
> Can anyone help me to give best practice in stopping the spark application 
> when the job is complete?
> 
> Thanks,
> Imre

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extra libs in executor classpath

2016-03-19 Thread Леонид Поляков
No, sadly, it's not an option.
End users are not my team members, it's for customers, so I have to bundle
the framework and ship it.
There is more to my project than just libs, so end users will have to use
bundle anyway.

On Wed, Mar 16, 2016 at 6:41 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Could you publish it as a library (to an internal repo) then you can
> simply use the “--packages" option? Also will help with versioning as you
> make changes, that way you’re not having to manually ship JARs around to
> your machines and users.
>
>
>
>
>
>
>
> *From: *Леонид Поляков 
> *Sent: *Wednesday, March 16, 2016 7:22 AM
> *To: *user@spark.apache.org
> *Subject: *Extra libs in executor classpath
>
>
>
> Hello, guys!
>
>
>
> I’ve been developing a kind of framework on top of spark, and my idea is
> to bundle the framework jars and some extra configs with the spark and pass
> it to other developers for their needs. So that devs can use this bundle
> and run usual spark stuff but with extra flavor that framework will add.
>
>
>
> I’m trying to figure out how to properly set up the driver/executor
> classpath, so that framework classes are always loaded when you use the
> bundle.
>
> I put framework libs in /lib folder right now, but will switch to
> something more specific later. I’m putting next spark-defaults.conf into
> my bundle:
>
>
>
> spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*
>
> spark.driver.extraClassPath lib/*
>
>
>
> And this seem to work, but I want to get rid of the absolute path from
> spark.executor.extraClassPath and use something relative, or spark home
> somehow, since libs are right there under /lib
>
> I’ve tried these settings for executor, and they do not work:
>
> spark.executor.extraClassPath $SPARK_HOME/lib/*
>
> spark.executor.extraClassPath lib/*
>
>
>
> I’ve found out that work directory for started workers is like
> $SPARK_HOME/work/app-20160316070310-0002/0, so this works:
>
> spark.executor.extraClassPath ../../../lib/*
>
>
>
> But looks cheaty and not stable.
>
>
>
> Could you help me with this issue? Maybe there are some placeholders that
> I can use in configs?
>
> Let me know if you need any worker/master/driver logs
>
>
>
> P.S. driver does not work if I am not in $SPARK_HOME when I execute
> spark-submit, e.g. if I do
>
> cd bin
>
> ./spark-submit …
>
> Then driver classpath is relative to /bin and now lib/* or ./lib/* in
> classpath does not work, so I need $SPARK_HOME for driver as well
>
>
> Thanks, Leonid
>


Re: Spark configuration with 5 nodes

2016-03-19 Thread Mich Talebzadeh
Thanks Steve,

For NN it all depends how fast you want a start-up. 1GB of NameNode
memory accommodates around 42T so if you are talking about 100GB of NN
memory then SSD may make sense to speed up the start-up. Raid 10 is the
best one that one can get  assuming all internal disks.

In general it is also suggested that fsimage are copied across to NFS
mount directory between primary and fail-over in case of an issue.

Cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 17 March 2016 at 12:02, Steve Loughran  wrote:

>
> On 11 Mar 2016, at 16:25, Mich Talebzadeh 
> wrote:
>
> Hi Steve,
>
> My argument has always been that if one is going to use Solid State Disks
> (SSD), it makes sense to have it for NN disks start-up from fsimage etc.
> Obviously NN lives in memory. Would you also rerommend RAID10 (mirroring &
> striping) for NN disks?
>
>
> I don't have any suggestions there, sorry. That said, NN disks do need to
> be RAIDed for protection against corruption, as they don't have the
> cross-cluster replication. They matter
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 11 March 2016 at 10:42, Steve Loughran  wrote:
>
>>
>> On 10 Mar 2016, at 22:15, Ashok Kumar > > wrote:
>>
>>
>> Hi,
>>
>> We intend  to use 5 servers which will be utilized for building Bigdata
>> Hadoop data warehouse system (not using any propriety distribution like
>> Hortonworks or Cloudera or others).
>>
>>
>> I'd argue that life is if simpler with either of these, or bigtop+ambari
>> built up yourself, for the management and monitoring tools more than
>> anything else. Life is simpler if there's a web page of cluster status.
>> But: DIY teaches you the internals of how things work, which is good for
>> getting your hands dirty later on. Just start to automate things from the
>> outset, keep configs under SCM, etc. And decide whether or not you want to
>> go with Kerberos (==secure HDFS) from the outset. If you don't, put your
>> cluster on a separate isolated subnet. You ought to have the boxes on a
>> separate switch anyway if you can, just to avoid network traffic hurting
>> anyone else on the switch.
>>
>> All servers configurations are 512GB RAM, 30TB storage and 16 cores,
>> Ubuntu Linux servers. Hadoop will be installed on all the servers/nodes.
>> Server 1 will be used for NameNode plus DataNode as well. Server 2 will be
>> used for standby NameNode & DataNode. The rest of the servers will be
>> used as DataNodes..
>>
>>
>>
>> 1. Make sure you've got the HDFS/NN space allocation on the two NNs set
>> up so that HDFS blocks don't get into the way of the NN's needs (which
>> ideally should be on a separate disk with RAID turned on);
>> 2. Same for the worker nodes; temp space matters
>> 3. On a small cluster, the cost of a DN failure is more significant: a
>> bigger fraction of the data will go offline, recovery bandwidth limited to
>> the 4 remaining boxes, etc, etc. Just be aware of that: in a bigger
>> cluster, a single server is usually less traumatic. Though HDFS-599 shows
>> that even facebook can lose a cluster or two.
>>
>> Now we would like to install Spark on each servers to create Spark
>> cluster. Is that the good thing to do or we should buy additional hardware
>> for Spark (minding cost here) or simply do we require additional memory to
>> accommodate Spark as well please. In that case how much memory for each
>> Spark node would you recommend?
>>
>>
>> You should be running your compute work on the same systems as the data,
>> as its the "hadoop cluster way"; locality of data ==> performance. If you
>> were to buy more hardware, go for more store+compute, rather than just
>> compute.
>>
>> Spark likes RAM for sharing results; less RAM == more problems. but: you
>> can buy extra RAM if you need it, provided you've got space in the servers
>> to put it in. Same for storage.
>>
>> Do make sure that you have ECC memory; there are some papers from google
>> and microsoft on that topic if you want links to the details. Without ECC
>> your data will be corrupted *and you won't even know*
>>
>> -Steve
>>
>>
>>
>
>


Re: Spark configuration with 5 nodes

2016-03-19 Thread Steve Loughran

> On 17 Mar 2016, at 12:28, Mich Talebzadeh  wrote:
> 
> Thanks Steve,
> 
> For NN it all depends how fast you want a start-up. 1GB of NameNode memory 
> accommodates around 42T so if you are talking about 100GB of NN memory then 
> SSD may make sense to speed up the start-up. Raid 10 is the best one that one 
> can get  assuming all internal disks.

I wasn't really thinking of startup: in larger clusters startup time is often 
determined by how long it takes for all the datanodes to report in, and for 
HDFS to exit safe mode. But of course, the NN doesn't start listening for DN 
block reports until it's read in the FS image *and replayed the log*, so start 
time will be O(image+ log-events + DNs)

> 
> In general it is also suggested that fsimage are copied across to NFS mount 
> directory between primary and fail-over in case of an issue.

yes

if you're curious, there's a 2011 paper on Y!s experience

https://www.usenix.org/system/files/login/articles/chansler_0.pdf

there are also a trace of HDFS failure events in some of the JIRAs, HDFS-599 
being the classic, as is HADOOP-572. Both of these document cascade failures in 
Facebook's HDFS clusters. Scale brings interesting problems


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



CfP 11th Workshop on Virtualization in High-Performance Cloud Computing (VHPC '16)

2016-03-19 Thread VHPC 16

CALL FOR PAPERS


11th Workshop on Virtualization in High­-Performance Cloud Computing  (VHPC
'16)
held in conjunction with the International Supercomputing Conference - High
Performance,
June 19-23, 2016, Frankfurt, Germany.



Date: June 23, 2016
Workshop URL: http://vhpc.org

Paper Submission Deadline: April 25, 2016


Call for Papers

Virtualization technologies constitute a key enabling factor for flexible
resource
management in modern data centers, and particularly in cloud environments.
Cloud providers need to manage complex infrastructures in a seamless
fashion to support
the highly dynamic and heterogeneous workloads and hosted applications
customers
deploy. Similarly, HPC environments have been increasingly adopting
techniques that
enable flexible management of vast computing and networking resources,
close to marginal
provisioning cost, which is unprecedented in the history of scientific and
commercial
computing.

Various virtualization technologies contribute to the overall picture in
different ways: machine
virtualization, with its capability to enable consolidation of multiple
under­utilized servers with
heterogeneous software and operating systems (OSes), and its capability to
live­-migrate a
fully operating virtual machine (VM) with a very short downtime, enables
novel and dynamic
ways to manage physical servers; OS-­level virtualization (i.e.,
containerization), with its
capability to isolate multiple user­-space environments and to allow for
their co­existence
within the same OS kernel, promises to provide many of the advantages of
machine
virtualization with high levels of responsiveness and performance; I/O
Virtualization allows
physical NICs/HBAs to take traffic from multiple VMs or containers; network
virtualization,
with its capability to create logical network overlays that are independent
of the underlying
physical topology and IP addressing, provides the fundamental ground on top
of which
evolved network services can be realized with an unprecedented level of
dynamicity and
flexibility; the increasingly adopted paradigm of Software-­Defined
Networking (SDN)
promises to extend this flexibility to the control and data planes of
network paths.


Topics of Interest

The VHPC program committee solicits original, high-quality submissions
related to
virtualization across the entire software stack with a special focus on the
intersection of HPC
and the cloud. Topics include, but are not limited to:

- Virtualization in supercomputing environments, HPC clusters, cloud HPC
and grids
- OS-level virtualization including container runtimes (Docker, rkt et al.)
- Lightweight compute node operating systems/VMMs
- Optimizations of virtual machine monitor platforms, hypervisors
- QoS and SLA in hypervisors and network virtualization
- Cloud based network and system management for SDN and NFV
- Management, deployment and monitoring of virtualized environments
- Virtual per job / on-demand clusters and cloud bursting
- Performance measurement, modelling and monitoring of virtualized/cloud
workloads
- Programming models for virtualized environments
- Virtualization in data intensive computing and Big Data processing
- Cloud reliability, fault-tolerance, high-availability and security
- Heterogeneous virtualized environments, virtualized accelerators, GPUs
and co-processors
- Optimized communication libraries/protocols in the cloud and for HPC in
the cloud
- Topology management and optimization for distributed virtualized
applications
- Adaptation of emerging HPC technologies (high performance networks, RDMA,
etc..)
- I/O and storage virtualization, virtualization aware file systems
- Job scheduling/control/policy in virtualized environments
- Checkpointing and migration of VM-based large compute jobs
- Cloud frameworks and APIs
- Energy-efficient / power-aware virtualization


The Workshop on Virtualization in High­-Performance Cloud Computing (VHPC)
aims to
bring together researchers and industrial practitioners facing the
challenges
posed by virtualization in order to foster discussion, collaboration,
mutual exchange
of knowledge and experience, enabling research to ultimately provide novel
solutions for virtualized computing systems of tomorrow.

The workshop will be one day in length, composed of 20 min paper
presentations, each
followed by 10 min discussion sections, plus lightning talks that are
limited to 5 minutes.
Presentations may be accompanied by interactive demonstrations.

Important Dates

April 25, 2016 - Paper submission deadline
May 30, 2016 Acceptance notification
June 23, 2016 - Workshop Day
July 25, 2016 - Camera-ready version due


Chair

Michael Alexander (chair), TU Wien, Austria
Anastassios Nanos (co-­chair), NTUA, Greece
Balazs Gerofi (co-­chair), ​RIKEN Advanced Institute for Computational
Science​, Japan


Program committee

Stergios 

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Cody Koeninger
Sounds like you're using one of the KafkaUtils.createDirectStream
overloads that needs to do some broker communication in order to even
construct the stream, because you aren't providing topicpartitions?
Just wrap your construction attempt in a try / catch and retry in
whatever way makes sense for you.

I'd also look in to why it's failing to communicate with brokers, though.

On Fri, Mar 18, 2016 at 1:03 PM, Bryan Jeffrey  wrote:
> Cody et. al,
>
> I am seeing a similar error.  I've increased the number of retries.  Once
> I've got a job up and running I'm seeing it retry correctly. However, I am
> having trouble getting the job started - number of retries does not seem to
> help with startup behavior.
>
> Thoughts?
>
> Regards,
>
> Bryan Jeffrey
>
> On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger  wrote:
>>
>> That's a networking error when the driver is attempting to contact
>> leaders to get the latest available offsets.
>>
>> If it's a transient error, you can look at increasing the value of
>> spark.streaming.kafka.maxRetries, see
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> If it's not a transient error, you need to look at your brokers + your
>> network environment.
>>
>> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
>>  wrote:
>> > Hi,
>> >
>> > Can you check Kafka topic replication ? And leader information?
>> >
>> > Regards,
>> > Surendra M
>> >
>> >
>> >
>> > -- Surendra Manchikanti
>> >
>> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss 
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >> I have a SparkStream (with Kafka) job, after running several days, it
>> >> failed with following errors:
>> >> ERROR DirectKafkaInputDStream:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
>> >> issue?
>> >>
>> >>
>> >>
>> >> Regards
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *** from the log ***
>> >>
>> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
>> >> is
>> >> overridden to
>> >>
>> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
>> >> java.nio.channels.ClosedChannelException
>> >>
>> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
>> >> 1458188031800 ms
>> >>
>> >> org.apache.spark.SparkException:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> >>
>> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> >>
>> >> at scala.Option.orElse(Option.scala:257)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >>
>> >> at
>> >>
>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >>
>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> >>
>> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >>
>> >> at
>> >>
>> >> 

Re: Subquery performance

2016-03-19 Thread Michael Armbrust
Try running EXPLAIN on both version of the query.

Likely when you cache the subquery we know that its going to be small so
use a broadcast join instead of a shuffling the data.

On Thu, Mar 17, 2016 at 5:53 PM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi all,
>
>
>
> I’m running a query that looks like the following:
>
> Select col1, count(1)
>
> From (Select col2, count(1) from tab2 group by col2)
>
> Inner join tab1 on (col1=col2)
>
> Group by col1
>
>
>
> This creates a very large shuffle, 10 times the data size, as if the
> subquery was executed for each row.
>
> Anything can be done to tune to help tune this?
>
> When the subquery in persisted, it runs much faster, and the shuffle is 50
> times smaller!
>
>
>
> *Thanks,*
>
> *Younes*
>


Re: unix_timestamp() time zone problem

2016-03-19 Thread Davies Liu
Could you try to cast the timestamp as long?

Internally, timestamp are stored as microseconds in UTC, you will got
seconds in UTC if you cast it to long.

On Thu, Mar 17, 2016 at 1:28 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I am using python spark 1.6 and the --packages
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10
>
> I need to convert a time stamp string into a unix epoch time stamp. The
> function unix_timestamp() function assume current time zone. How ever my
> string data is UTC and encodes the time zone as zero. I have not been able
> to find a way to get the unix time calculated correctly. simpleDateFormat
> does not have good time zone support. Any suggestions?
>
> I could write a UDF and to adjust for time zones how ever this seems like
>  a hack
>
> I tried using to_utc_timestamp(created, 'gmt’) how ever this creates a
> timestamp. I have not been able to figure out how to convert this to a unix
> time sample I.e a long representing epoch
>
> Any suggestions?
>
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as
> unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as etc \
>  from \
> rawTable \
>  where \
>  (created > '{0}') and (created <= '{1}') \
>  and \
>  (row_key = ‘blue' \
> or row_key = ‘red' \
> )".format('2016-03-12 00:30:00+', '2016-03-12
> 04:30:00+’)
>
>
> Sample out put
>
> root
>  |-- row_key: string (nullable = true)
>  |-- created: timestamp (nullable = true)
>  |-- count: long (nullable = true)
>  |-- unixTimeStamp: long (nullable = true)
>  |-- etc: long (nullable = true)
>
> 2016-03-12 00:30:30.0 should be 1457742630 not 1457771430
>
> +-+-+-+-+--+
> |row_key  |created|count|unixTimeStamp|utc|
> +-+-+-+-+--+
> |red|2016-03-12 00:30:30.0|2|1457771430   |1457771430|
> |red|2016-03-12 00:30:45.0|1|1457771445   |1457771445|
>
>
>
> static Column
> 
>  *unix_timestamp
> *
> (Column
> 
>  s)
> Converts time string in format -MM-dd HH:mm:ss to Unix timestamp (in
> seconds), using the default timezone and the default locale, return null if
> fail.
> static Column
> 
>  *unix_timestamp
> *
> (Column
> 
>  s,
> java.lang.String p)
> Convert time string with given pattern (see [
> http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
> to Unix time stamp (in seconds), return null if fail.
>


Re: Fwd: Spark 2.0 Shell -csv package weirdness

2016-03-19 Thread Marco Mistroni
Have u tried df.saveAsParquetFIle? I think that method is on df Api
Hth
Marco
On 19 Mar 2016 7:18 pm, "Vincent Ohprecio"  wrote:

>
> For some reason writing data from Spark shell to csv using the `csv
> package` takes almost an hour to dump to disk. Am I going crazy or did I do
> this wrong? I tried writing to parquet first and its fast as normal.
>
> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes
> crazy and it sounds like its taking off like a plane ... lol
>
> Here is the code if anyone wants to experiment:
>
> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>
> //
>
> // version 2.0.0-SNAPSHOT
>
> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.7.0_80)
>
> // http://stat-computing.org/dataexpo/2009/the-data.html
>
>
> def time[R](block: => R): R = {
>
> val t0 = System.nanoTime()
>
> val result = block// call-by-name
>
> val t1 = System.nanoTime()
>
> println("Elapsed time: " + (t1 - t0) + "ns")
>
> result
>
> }
>
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("/Users/employee/Downloads/2008.csv")
>
> val df_1 = df.withColumnRenamed("Year","oldYear")
>
> val df_2 =
> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>
> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
> newType:String) = {
>
>   val df_1 = df.withColumnRenamed(name, "swap")
>
>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>
> }
>
> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>
> val df_4 = convertColumn(df_2, "DepDelay", "int")
>
>
> // test write to parquet is fast
>
> df_4.select("Year",
> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>
>
> val selectedData = df_4.select("Year", "Cancelled")
>
>
>
> val howLong =
> Time(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
>
>
> //scala> val howLong =
> time(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
>
> //Elapsed time: 348827227ns
>
> //howLong: Unit = ()
>
> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb
>
>
>
>
>


Fwd: Spark 2.0 Shell -csv package weirdness

2016-03-19 Thread Vincent Ohprecio
For some reason writing data from Spark shell to csv using the `csv
package` takes almost an hour to dump to disk. Am I going crazy or did I do
this wrong? I tried writing to parquet first and its fast as normal.

On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes
crazy and it sounds like its taking off like a plane ... lol

Here is the code if anyone wants to experiment:

// ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0

//

// version 2.0.0-SNAPSHOT

// Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_80)

// http://stat-computing.org/dataexpo/2009/the-data.html


def time[R](block: => R): R = {

val t0 = System.nanoTime()

val result = block// call-by-name

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) + "ns")

result

}


val df =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("/Users/employee/Downloads/2008.csv")

val df_1 = df.withColumnRenamed("Year","oldYear")

val df_2 =
df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")

def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
newType:String) = {

  val df_1 = df.withColumnRenamed(name, "swap")

  df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")

}

val df_3 = convertColumn(df_2, "ArrDelay", "int")

val df_4 = convertColumn(df_2, "DepDelay", "int")


// test write to parquet is fast

df_4.select("Year",
"Cancelled").write.format("parquet").save("yearAndCancelled.parquet")


val selectedData = df_4.select("Year", "Cancelled")



val howLong =
Time(selectedData.write.format("com.databricks.spark.csv").option("header",
"true").save("output.csv"))


//scala> val howLong =
time(selectedData.write.format("com.databricks.spark.csv").option("header",
"true").save("output.csv"))

//Elapsed time: 348827227ns

//howLong: Unit = ()

https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb


Re: installing packages with pyspark

2016-03-19 Thread Ajinkya Kale
Thanks Jakob, Felix. I am aware you can do it with --packages but i was
wondering if there is a way to do something like "!pip install "
like i do for other packages from jupyter notebook for python. But I guess
I cannot add a package once i launch the pyspark context right ?

On Thu, Mar 17, 2016 at 6:59 PM Felix Cheung 
wrote:

> For some, like graphframes that are Spark packages, you could also use
> --packages in the command line of spark-submit or pyspark. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> _
> From: Jakob Odersky 
> Sent: Thursday, March 17, 2016 6:40 PM
> Subject: Re: installing packages with pyspark
> To: Ajinkya Kale 
> Cc: 
>
>
> Hi,
> regarding 1, packages are resolved locally. That means that when you
> specify a package, spark-submit will resolve the dependencies and
> download any jars on the local machine, before shipping* them to the
> cluster. So, without a priori knowledge of dataproc clusters, it
> should be no different to specify packages.
>
> Unfortunatly I can't help with 2.
>
> --Jakob
>
> *shipping in this case means making them available via the network
>
> On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale 
> wrote:
> > Hi all,
> >
> > I had couple of questions.
> > 1. Is there documentation on how to add the graphframes or any other
> package
> > for that matter on the google dataproc managed spark clusters ?
> >
> > 2. Is there a way to add a package to an existing pyspark context
> through a
> > jupyter notebook ?
> >
> > --aj
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Checkpoint of DStream joined with RDD

2016-03-19 Thread Ted Yu
I looked at the places in SparkContext.scala where NewHadoopRDD is
constrcuted.
It seems the Configuration object shouldn't be null.

Which hbase release are you using (so that I can see which line the NPE
came from) ?

Thanks

On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad 
wrote:

> Hi,
>
> I tried to replicate the example of joining DStream with lookup RDD from
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation.
> It works fine, but when I enable checkpointing for the StreamingContext and
> let the application to recover from a previously created checkpoint, I
> always get an exception during start and the whole application fails. I
> tried various types of lookup RDD, but the result is the same.
>
> Exception in the case of HBase RDD is:
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
> at
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
> at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
> at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
> at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
> at java.util.TimSort.sort(TimSort.java:216)
> at java.util.Arrays.sort(Arrays.java:1438)
> at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
> at scala.collection.AbstractSeq.sorted(Seq.scala:40)
> at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
> at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
> at
> org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at
> org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)
> at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546)
>
> I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be
> that RDDs use some transient fields which are not restored when they are
> recovered from checkpoint files. In case of some RDD implementations it is
> SparkContext, but it can be also implementation specific Configuration
> object, etc. I see in the sources that in the case of DStream recovery, the
> DStreamGraph takes care of restoring StreamingContext in all its DStream-s.
> But I haven't found any similar mechanism for RDDs.
>
> So my question is whether I am doing something wrong or this is a bug in
> Spark? If later, is there some workaround except for implementing a custom
> DStream which will return the same RDD every batch interval and joining at
> DStream level instead of RDD level in transform?
>
> I apologize if this has been discussed in the past and I missed it when
> looking into archive.
>
> Thanks,
> Lubo
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark streaming with akka association with remote system failure

2016-03-19 Thread David Gomez Saavedra
Solved the issue by setting up the same heartbeat interval and pauses in
both actor systems

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = DEBUG
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  log-dead-letters = on
  log-dead-letters-during-shutdown = on

  daemonic = on
  jvm-exit-on-fatal-error = off

  actor {
provider = "akka.remote.RemoteActorRefProvider"
default-dispatcher.throughput = 15
  }
  remote {
enabled-transports = ["akka.remote.netty.tcp"]
log-remote-lifecycle-events = on
require-cookie = off
secure-cookie = off
netty.tcp {
  hostname = "spark-engine"
  port = 9083
  tcp-nodelay = on
  transport-class = "akka.remote.transport.netty.NettyTransport"
  connection-timeout = 120 s
  execution-pool-size = 4
}

transport-failure-detector {
  heartbeat-interval = 4 s
  acceptable-heartbeat-pause = 16 s
}
  }
}


.set("spark.akka.heartbeat.interval", "4s")
.set("spark.akka.heartbeat.pauses", "16s")


On Tue, Mar 15, 2016 at 9:50 PM, David Gomez Saavedra 
wrote:

> hi there,
>
> I'm trying to set up a simple spark streaming app using akka actors as
> receivers. I followed the example provided and created two apps. One
> creating an actor system and another one subscribing to it. I can see the
> subscription message but few seconds later i get an error
>
> [info] 20:37:40.296 [INFO ] Slf4jLogger started
> [info] 20:37:40.466 [INFO ] Starting remoting
> [info] 20:37:40.871 [INFO ] Remoting started; listening on addresses
> :[akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.876 [INFO ] Remoting now listens on addresses:
> [akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.913 [INFO ] starting actor on
> akka://spark-engine/user/integrationActor
> [info] received subscribe from Actor[akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
> ]
> [info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out
> or transport failure detector triggered.
> [info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> [info] received unsubscribe from Actor[akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
> ]
>
> I'm running the master and worker on docker. The two apps are running on
> my laptop for testing. Here's the code of both
>
> def main(args: Array[String]) {
>   val conf = new SparkConf()
> .setMaster(sparkMaster)
> .setAppName(sparkApp)
> .set("spark.logConf", "true")
> .set("spark.driver.port","7001")
> .set("spark.fileserver.port","6002")
> .set("spark.broadcast.port","6003")
> .set("spark.replClassServer.port","6004")
> .set("spark.blockManager.port","6005")
> .set("spark.executor.port","6006")
> .set("spark.akka.heartbeat.interval", "100")
> .set("spark.akka.logLifecycleEvents", "true")
> .set("spark.rpc.netty.dispatcher.numThreads","2")
> .setJars(sparkJars)
>
>
>   val ssc = new StreamingContext(conf, Seconds(5))
>
>   ssc.checkpoint("/tmp")
>
>   val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int, Int, 
> Int]]] (Props(new 
> GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")),
>  "TagsReceiver")
>
>   tags.print()
>
>   ssc.start()
>   ssc.awaitTermination()
>
> }
>
>
>
> def main(args: Array[String]) {
>
>   val config = ConfigFactory.load()
>   val system = ActorSystem("spark-engine", config.getConfig("spark-engine"))
>
>   val integrationActor = system.actorOf(Props(new IntegrationActor()), 
> "integrationActor")
>
>   log.info("starting actor on " + integrationActor.path)
>
>   system.awaitTermination()
>
> }
>
>
> This is my config for the remote actor system to where spark subscribes
>
> spark-engine {
>
>   akka {
> loggers = ["akka.event.slf4j.Slf4jLogger"]
> loglevel = DEBUG
> logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
> log-dead-letters = 10
> log-dead-letters-during-shutdown = on
>
> actor {
>   provider = "akka.remote.RemoteActorRefProvider"
> }
> remote {
>   enabled-transports = ["akka.remote.netty.tcp"]
>   log-remote-lifecycle-events = on
>   netty.tcp {
> hostname = "spark-engine"
> port = 9083
>   }
> }
>   }
> }
>
> These are the logs from the executor
>
> 16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with remote 
> system [akka.tcp://spark-engine@spark-engine:9083] has failed, address is now 
> gated for [5000] ms. Reason: [Disassociated]
>
>
>
> Any idea why the two actor systems get disassociated ?
>
> Thank you very much in advanced.
>
> Best
> David
>


Issue regarding removal of duplicates from RDD

2016-03-19 Thread Thamme Gowda N.
Hi,

I am facing an issue while deduplicating the keys in RDD (Code Snippet
below).
I have few Sequence Files, some of them have duplicate entries. I am trying
to drop duplicate values for each key.

Here are two methods with code snippets:

val path = "path/to/sequence/file"
val rdd1 = ctx.sequenceFile(path, classOf[Text], classOf[Content])
val rdd1Copy = ctx.sequenceFile(path, classOf[Text], classOf[Content])
// Duplicates for the sake of testing
val rdds = Array(rdd1, rdd1Copy)
val rdd = ctx.union(rdds) // club all parts with duplicates

//Method 1 : group by key followed by a map to pick unique value in each group
rdd.groupByKey()
  .map(rec => (rec._1, rec._2.iterator.next()))
  .saveAsHadoopFile(output+"-method1", classOf[Text],
classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]])
// save it

//Method 2 : reduce by key and drop the duplicate values
rdd.reduceByKey((v1, v2) => v1)
  .saveAsHadoopFile(output+"-method2", classOf[Text],
classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]])
// save it


The method 1 works fine but looks like groupBy is expensive.

 Method 2 is more interesting but it is not working as expected.
The issue is: (1) it is removing lot more entries than it is supposed to
remove and also (2) introducing duplicates.

In my test, I had 47000 unique records (duplicated by the union for test),
method 1 got it right  and method 2 got only 1790 records out of which only
30 are unique.
 The number 30 was also the number of stages the job took to run.

Can you please help me understand why the Method 2 is not the right way to
remove duplicate values or what is wrong with the code snippet?

p.s.
Got Method2 from an accepted answer in StackOverflow:
http://stackoverflow.com/a/31656056/1506477

-
Thanks and regards
Thamme


--
*Thamme Gowda N. *
Grad Student at usc.edu
Twitter: @thammegowda  
Website: http://scf.usc.edu/~tnarayan/


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-19 Thread Sebastian Piu
Try to toubleshoot why it is happening, maybe some messages are too big to
be read from the topic? I remember getting that error and that was the cause

On Fri, Mar 18, 2016 at 11:16 AM Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
>
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
>
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
>
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
>
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sql timestamp timezone bug

2016-03-19 Thread Davies Liu
On Thu, Mar 17, 2016 at 3:02 PM, Andy Davidson
 wrote:
> I am using pyspark 1.6.0 and
> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
> data
>
> The data is originally captured by a spark streaming app and written to
> Cassandra. The value of the timestamp comes from
>
> Rdd.foreachRDD(new VoidFunction2()
> Š});
>
> I am confident the time stamp is stored correctly in cassandra and that
> the clocks on the machines in my cluster are set correctly
>
> I noticed that if I used Cassandra CQLSH to select a data set between two
> points in time the row count did not match the row count I got when I did
> the same select in spark using SQL, It appears the spark sql assumes all
> timestamp strings are in the local time zone.
>
>
> Here is what I expect. (this is what is returned by CQLSH)
> cqlsh> select
>... count(row_key) as num_samples, sum(count) as total, max(count)
> as max
>... from
>... notification.json_timeseries
>... where
>... row_key in (Œred', Œblue')
>... and created > '2016-03-12 00:30:00+'
>... and created <= '2016-03-12 04:30:00+'
>... allow filtering;
>
>  num_samples | total| max
> -+--+---
> 3242 |11277 |  17
>
>
> Here is  my pyspark select statement. Notice the Œcreated column encodes
> the timezone¹. I am running this on my local mac (in PST timezone) and
> connecting to my data center (which runs on UTC) over a VPN.
>
> rawDF = sqlContext.read\
> .format("org.apache.spark.sql.cassandra")\
> .options(table="json_timeseries", keyspace="notification")\
> .load()
>
>
> rawDF.registerTempTable(tmpTableName)
>
>
>
> stmnt = "select \
> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
> to_utc_timestamp(created, 'gmt') as gmt \
> from \
> rawTable \
> where \
> (created > '{0}') and (created <= '{1}') \
> and \
> (row_key = Œred' or row_key = Œblue¹) \
> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
>
> rawDF = sqlCtx.sql(stmnt).cache()

What's the type of `created`? TimestampType?

If yes, when created is compared to a string, it will be casted into
string, then compared as string, it become

cast(created, as string) > '2016-03-12 00:30:00+'

Could you try this

sqlCtx.sql("select created, cast(created as string) from rawTable").show()



>
>
>
> I get a different values for row count, max, Š
>
> If I convert the UTC time stamp string to my local timezone the row count
> matches the count returned by  cqlsh
>
> # pst works, matches cassandra cqlsh
> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
>
> Am I doing something wrong in my pyspark code?
>
>
> Kind regards
>
> Andy
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Ascot Moss
Hi,

I have a SparkStream (with Kafka) job, after running several days, it
failed with following errors:
ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

Any idea what would be wrong? will it be SparkStreaming buffer overflow
issue?



Regards






*** from the log ***

16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
overridden to

16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException

16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
1458188031800 ms

org.apache.spark.SparkException:
ArrayBuffer(java.nio.channels.ClosedChannelException)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)

at scala.util.Try$.apply(Try.scala:161)

at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)

at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Exception in thread "main" org.apache.spark.SparkException:
ArrayBuffer(java.nio.channels.ClosedChannelException)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at

Re: The error to read HDFS custom file in spark.

2016-03-19 Thread Tony Liu
Sorry for latter reply. Yep, RDRawDataRecord is my object, It defined in
other java project(jar.), I get it with maven. My MapReduce program also
use it and works.

On Fri, Mar 18, 2016 at 12:48 AM, Mich Talebzadeh  wrote:

> Hi Tony,
>
> Is
>
> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>
> One of your own packages?
>
> Sounds like it is one throwing the error
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 March 2016 at 15:21, Tony Liu  wrote:
>
>> Hi,
>>My HDFS file is store with custom data structures. I want to read it
>> with SparkContext object.So I define a formatting object:
>>
>> *1. code of RawDataInputFormat.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/16/16.
>>   */
>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> FileInputFormat {
>>
>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>> Reporter): RecordReader[LW, RD] = {
>> new RawReader(split, job, reporter)
>>   }
>>
>> }
>>
>> *2. code of RawReader.scala*
>>
>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>> import org.apache.hadoop.mapred._
>>
>> /**
>>   * Created by Tony on 3/17/16.
>>   */
>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>> RecordReader[LW, RD] {
>>
>>   var reader: SequenceFile.Reader = null
>>   var currentPos: Long = 0L
>>   var length: Long = 0L
>>
>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>> this()
>> val p = (split.asInstanceOf[FileSplit]).getPath
>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>   }
>>
>>   override def next(key: LW, value: RD): Boolean = {
>> val flag = reader.next(key, value)
>> currentPos = reader.getPosition()
>> flag
>>   }
>>
>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>> length.toFloat)
>>
>>   override def getPos: Long = currentPos
>>
>>   override def createKey(): LongWritable = {
>> new LongWritable()
>>   }
>>
>>   override def close(): Unit = {
>> reader.close()
>>   }
>>
>>   override def createValue(): RDRawDataRecord = {
>> new RDRawDataRecord()
>>   }
>> }
>>
>> *3. code of RDRawDataRecord.scala*
>>
>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import org.apache.commons.lang.StringUtils;
>> import org.apache.hadoop.io.Writable;
>>
>> public class RDRawDataRecord implements Writable {
>> private String smac;
>> private String dmac;
>> private int hrssi;
>> private int lrssi;
>> private long fstamp;
>> private long lstamp;
>> private long maxstamp;
>> private long minstamp;
>> private long stamp;
>>
>> public void readFields(DataInput in) throws IOException {
>> this.smac = in.readUTF();
>> this.dmac = in.readUTF();
>> this.hrssi = in.readInt();
>> this.lrssi = in.readInt();
>> this.fstamp = in.readLong();
>> this.lstamp = in.readLong();
>> this.maxstamp = in.readLong();
>> this.minstamp = in.readLong();
>> this.stamp = in.readLong();
>> }
>>
>> public void write(DataOutput out) throws IOException {
>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>> out.writeInt(this.hrssi);
>> out.writeInt(this.lrssi);
>> out.writeLong(this.fstamp);
>> out.writeLong(this.lstamp);
>> out.writeLong(this.maxstamp);
>> out.writeLong(this.minstamp);
>> out.writeLong(this.stamp);
>> }
>>
>> */** *
>>
>> *ignore getter setter*
>>
>> ***/*
>>
>> }
>>
>> *At last, I use this code to run*:
>>
>> val filePath = 
>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>> val conf = new SparkConf()
>> conf.setMaster("local")
>> conf.setAppName("demo")
>> val sc = new SparkContext(conf)
>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>> file.foreach(v => {
>>   println(v._2.getDmac) // Attribute of custom objects
>> })
>>
>> *I get an error, it says:*
>>
>> Error:(41, 19) type arguments 
>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>  conform to the bounds of none of the overloaded 

Re: How to add an accumulator for a Set in Spark

2016-03-19 Thread Adrien Mogenet
Btw, here is a great article about accumulators and all their related
traps!
http://imranrashid.com/posts/Spark-Accumulators/ (I'm not the author)

On 16 March 2016 at 18:24, swetha kasireddy 
wrote:

> OK. I did take a look at them. So once I have an accumulater for a
> HashSet, how can I check if a particular key is already present in the
> HashSet accumulator? I don't see any .contains method there. My requirement
> is that I need to keep accumulating the keys in the HashSet across all the
> tasks in various nodes and use it to do a check if the key is already
> present in the HashSet.
>
> On Tue, Mar 15, 2016 at 9:56 PM, pppsunil  wrote:
>
>> Have you looked at using Accumulable interface,  Take a look at Spark
>> documentation at
>> http://spark.apache.org/docs/latest/programming-guide.html#accumulators
>> it
>> gives example of how to use vector type for accumalator, which might be
>> very
>> close to what you need
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-an-accumulator-for-a-Set-in-Spark-tp26510p26514.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com
http://www.contentsquare.com
50, avenue Montaigne - 75008 Paris


Spark jobs running in serial mode on yarn client

2016-03-19 Thread Mich Talebzadeh
Hi,

I am testing some parallel processing of Spark applications.

I have a two node spark cluster and currently running two worker processes
on each in Yarn-client mode. The master has 12 cores and 24GB of RAM. The
worker node has 4GB of RAM and 2 cores (well an old 32 bit host). The OS on
both is RHES5 with master 64-bit and the worker 32-bit.

I submit identical jobs to master via spark-shell, one from master and the
other from the worker node

However, whichever one comes first is executed but the other job is
waiting. I know this has nothing to do with Hadoop as I can run a Spark job
and identical Hive job in parallel.

I have allocated the following resources in spark-env.sh

export SPARK_EXECUTOR_CORES=3 ##, Number of cores for the workers (Default:
1).
export SPARK_EXECUTOR_MEMORY=3G ## , Memory per Worker (e.g. 1000M, 2G)
(Default: 1G)
export SPARK_DRIVER_MEMORY=1G ## , Memory for Master (e.g. 1000M, 2G)
(Default: 512 Mb)

The problem is that the first job goes and grabs 12 cores despite the above
settings. Is this behaviour expected in Yarn -client mode or there is
something wrong in here?

The Spark GUI shows the jobs. The master worker runs on address ending in
216 whereas the worker runs on 217 as shown below



Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Apache Spark Exception in thread “main” java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class

2016-03-19 Thread Josh Rosen
See the instructions in the Spark documentation:
https://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

On Wed, Mar 16, 2016 at 7:05 PM satyajit vegesna 
wrote:

>
>
> Hi,
>
> Scala version:2.11.7(had to upgrade the scala verison to enable case
> clasess to accept more than 22 parameters.)
>
> Spark version:1.6.1.
>
> PFB pom.xml
>
> Getting below error when trying to setup spark on intellij IDE,
>
> 16/03/16 18:36:44 INFO spark.SparkContext: Running Spark version 1.6.1
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/collection/GenTraversableOnce$class at
> org.apache.spark.util.TimeStampedWeakValueHashMap.(TimeStampedWeakValueHashMap.scala:42)
> at org.apache.spark.SparkContext.(SparkContext.scala:298) at
> com.examples.testSparkPost$.main(testSparkPost.scala:27) at
> com.examples.testSparkPost.main(testSparkPost.scala) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606) at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused
> by: java.lang.ClassNotFoundException:
> scala.collection.GenTraversableOnce$class at
> java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
> java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
> java.security.AccessController.doPrivileged(Native Method) at
> java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
> java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at
> java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 9 more
>
> pom.xml:
>
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/maven-v4_0_0.xsd;>
> 4.0.0
> StreamProcess
> StreamProcess
> 0.0.1-SNAPSHOT
> ${project.artifactId}
> This is a boilerplate maven project to start using Spark in 
> Scala
> 2010
>
> 
> 1.6
> 1.6
> UTF-8
> 2.10
> 
> 2.11.7
> 
>
> 
> 
> 
> cloudera-repo-releases
> https://repository.cloudera.com/artifactory/repo/
> 
> 
>
> 
> src/main/scala
> src/test/scala
> 
> 
> 
> maven-assembly-plugin
> 
> 
> package
> 
> single
> 
> 
> 
> 
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> 
> net.alchim31.maven
> scala-maven-plugin
> 3.2.2
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
> -dependencyfile
> 
> ${project.build.directory}/.scala_dependencies
> 
> 
> 
> 
> 
>
> 
> 
> maven-assembly-plugin
> 2.4.1
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> make-assembly
> package
> 
> single
> 
> 
> 
> 
> 
> 
> 
> 
> org.scala-lang
> scala-library
> ${scala.version}
> 
> 
> org.mongodb.mongo-hadoop
> mongo-hadoop-core
> 1.4.2
> 
> 
> javax.servlet
> servlet-api
> 
> 
> 
> 
> org.mongodb
> mongodb-driver
> 3.2.2
> 
> 
> javax.servlet
> servlet-api
> 
> 
> 
> 
> org.mongodb
> mongodb-driver
> 3.2.2
> 
> 
> javax.servlet
>

Re: Restarting an executor during execution causes it to lose AWS credentials (anyone seen this?)

2016-03-19 Thread Steve Loughran

On 17 Mar 2016, at 16:01, Allen George 
> wrote:

Hi guys,

I'm having a problem where respawning a failed executor during a job that 
reads/writes parquet on S3 causes subsequent tasks to fail because of missing 
AWS keys.

Setup:

I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple 
standalone cluster:

1 master
2 workers

My application is co-located on the master machine, while the two workers are 
on two other machines (one worker per machine). All machines are running in 
EC2. I've configured my setup so that my application executes its task on two 
executors (one executor per worker).

Application:

My application reads and writes parquet files on S3. I set the AWS keys on the 
SparkContext by doing:

val sc = new SparkContext()
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET")

At this point I'm done, and I go ahead and use "sc".

Issue:

I can read and write parquet files without a problem with this setup. *BUT* if 
an executor dies during a job and is respawned by a worker, tasks fail with the 
following error:

"Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a 
s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey 
properties (respectively)."

I've tried adding the AWS keys to core-site.xml, placing it in 
"/etc/hadoop-conf", and setting HADOOP_CONF_DIR in spark-env.sh on the 
master/worker machines, but that doesn't seem to help. I tried setting 
AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY in the worker environment, but 
that didn't work either. It seems that somehow the AWS keys aren't being picked 
by a newly-spawned executor. Has anyone seen this before? Is there a problem 
with my configuration that's causing this?

Thanks!
Allen

Terminal Musings: http://www.allengeorge.com/
Raft in Java: https://github.com/allengeorge/libraft/
Twitter: https://twitter.com/allenageorge/

are the AWS keys coming down as env variables?

There's a little bit of code in SparkHadoopUtil.newConfiguration() which 
sets/overrides those properties; it may be getting in the way of your explicit 
settings —certainly it will overwrite the -site.xml values


Zip File and XML parsing with Spark Streaming

2016-03-19 Thread tjb305
Hello,

I am trying to figure out how to unzip zip files in Spark Streaming. Within
each zip file will be a series of xml files which will also need parsing.

Are there libraries that work with DStream that parse a zip or parse an xml
file?. I have seen the databricks xml library but I do not think it works
with streams? I have seen some similar posts from early 2014 and wanted to
see if the situation has changed?

Thanks in advance for the help.

T



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Zip-File-and-XML-parsing-with-Spark-Streaming-tp26527.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unit test with sqlContext

2016-03-19 Thread Vikas Kawadia
If you prefer  the py.test framework, I just wrote a blog post with some
examples:

Unit testing Apache Spark with py.test
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

On Fri, Feb 5, 2016 at 11:43 AM, Steve Annessa 
wrote:

> Thanks for all of the responses.
>
> I do have an afterAll that stops the sc.
>
> While looking over Holden's readme I noticed she mentioned "Make sure to
> disable parallel execution." That was what I was missing; I added the
> follow to my build.sbt:
>
> ```
> parallelExecution in Test := false
> ```
>
> Now all of my tests are running.
>
> I'm going to look into using the package she created.
>
> Thanks again,
>
> -- Steve
>
>
> On Thu, Feb 4, 2016 at 8:50 PM, Rishi Mishra 
> wrote:
>
>> Hi Steve,
>> Have you cleaned up your SparkContext ( sc.stop())  , in a afterAll().
>> The error suggests you are creating more than one SparkContext.
>>
>>
>> On Fri, Feb 5, 2016 at 10:04 AM, Holden Karau 
>> wrote:
>>
>>> Thanks for recommending spark-testing-base :) Just wanted to add if
>>> anyone has feature requests for Spark testing please get in touch (or add
>>> an issue on the github) :)
>>>
>>>
>>> On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
 Hi Steve,

 Have you looked at the spark-testing-base package by Holden? It’s
 really useful for unit testing Spark apps as it handles all the
 bootstrapping for you.

 https://github.com/holdenk/spark-testing-base

 DataFrame examples are here:
 https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala

 Thanks,
 Silvio

 From: Steve Annessa 
 Date: Thursday, February 4, 2016 at 8:36 PM
 To: "user@spark.apache.org" 
 Subject: Unit test with sqlContext

 I'm trying to unit test a function that reads in a JSON file,
 manipulates the DF and then returns a Scala Map.

 The function has signature:
 def ingest(dataLocation: String, sc: SparkContext, sqlContext:
 SQLContext)

 I've created a bootstrap spec for spark jobs that instantiates the
 Spark Context and SQLContext like so:

 @transient var sc: SparkContext = _
 @transient var sqlContext: SQLContext = _

 override def beforeAll = {
   System.clearProperty("spark.driver.port")
   System.clearProperty("spark.hostPort")

   val conf = new SparkConf()
 .setMaster(master)
 .setAppName(appName)

   sc = new SparkContext(conf)
   sqlContext = new SQLContext(sc)
 }

 When I do not include sqlContext, my tests run. Once I add the
 sqlContext I get the following errors:

 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
 constructed (or threw an exception in its constructor).  This may indicate
 an error, since only one SparkContext may be running in this JVM (see
 SPARK-2243). The other SparkContext was created at:
 org.apache.spark.SparkContext.(SparkContext.scala:81)

 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
 akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is
 not unique!

 and finally:

 [info] IngestSpec:
 [info] Exception encountered when attempting to run a suite with class
 name: com.company.package.IngestSpec *** ABORTED ***
 [info]   akka.actor.InvalidActorNameException: actor name
 [ExecutorEndpoint] is not unique!


 What do I need to do to get a sqlContext through my tests?

 Thanks,

 -- Steve

>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


best practices: running multi user jupyter notebook server

2016-03-19 Thread Andy Davidson
We are considering deploying a notebook server for use by two kinds of users

1. interactive dashboard.
> 1. I.e. Forms allow users to select data sets and visualizations
> 2. Review real time graphs of data captured by our spark streams
2. General notebooks for Data Scientists

My concern is interactive spark jobs can can consume a lot of cluster
resource and many users may be sloppy/lazy. I.E. Just kill their browsers
instead of shutting down their notebooks cleanly

What are best practices?


Kind regards

Andy




Fwd: Apache Spark Exception in thread “main” java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class

2016-03-19 Thread satyajit vegesna
Hi,

Scala version:2.11.7(had to upgrade the scala verison to enable case
clasess to accept more than 22 parameters.)

Spark version:1.6.1.

PFB pom.xml

Getting below error when trying to setup spark on intellij IDE,

16/03/16 18:36:44 INFO spark.SparkContext: Running Spark version 1.6.1
Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/GenTraversableOnce$class at
org.apache.spark.util.TimeStampedWeakValueHashMap.(TimeStampedWeakValueHashMap.scala:42)
at org.apache.spark.SparkContext.(SparkContext.scala:298) at
com.examples.testSparkPost$.main(testSparkPost.scala:27) at
com.examples.testSparkPost.main(testSparkPost.scala) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused
by: java.lang.ClassNotFoundException:
scala.collection.GenTraversableOnce$class at
java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at
java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 9 more

pom.xml:

http://maven.apache.org/POM/4.0.0;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd;>
4.0.0
StreamProcess
StreamProcess
0.0.1-SNAPSHOT
${project.artifactId}
This is a boilerplate maven project to start using
Spark in Scala
2010


1.6
1.6
UTF-8
2.10

2.11.7





cloudera-repo-releases
https://repository.cloudera.com/artifactory/repo/




src/main/scala
src/test/scala



maven-assembly-plugin


package

single





jar-with-dependencies





net.alchim31.maven
scala-maven-plugin
3.2.2



compile
testCompile




-dependencyfile

${project.build.directory}/.scala_dependencies








maven-assembly-plugin
2.4.1


jar-with-dependencies




make-assembly
package

single








org.scala-lang
scala-library
${scala.version}


org.mongodb.mongo-hadoop
mongo-hadoop-core
1.4.2


javax.servlet
servlet-api




org.mongodb
mongodb-driver
3.2.2


javax.servlet
servlet-api




org.mongodb
mongodb-driver
3.2.2


javax.servlet
servlet-api




org.apache.spark
spark-streaming_2.10
1.6.1


org.apache.spark
spark-core_2.10
1.6.1


org.apache.spark
spark-sql_2.10
1.6.1


org.apache.hadoop
hadoop-hdfs
2.6.0


org.apache.hadoop
hadoop-auth
2.6.0


org.apache.hadoop

df.dtypes -> pyspark.sql.types

2016-03-19 Thread Ruslan Dautkhanov
Hello,

Looking at
https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/sql/types.html

and can't wrap my head around how to convert string data types names to
actual
pyspark.sql.types data types?

Does pyspark.sql.types has an interface to return StringType() for "string",
IntegerType() for "integer" etc? If it doesn't exist it would be great to
have such a
mapping function.

Thank you.


ps. I have a data frame, and use its dtypes to loop through all columns to
fix a few
columns' data types as a workaround for SPARK-13866.


-- 
Ruslan Dautkhanov


Joins in Spark

2016-03-19 Thread Stuti Awasthi
Hi All,

I have to join 2 files both not very big say few MBs only but the result can be 
huge say generating 500GBs to TBs of data.  Now I have tried using spark Join() 
function but Im noticing that join is executing on only 1 or 2 nodes at the 
max. Since I have a cluster size of 5 nodes , I tried to pass 
"join(otherDataset, [numTasks])" as numTasks=10 but again what I noticed that 
all the 9 tasks are finished instantly and only 1 executor is processing all 
the data.

I searched on internet and got that we can use Broadcast variable to send data 
from 1 file to all nodes and then use map function to do the join. In this way 
I should be able to run multiple task on different executors.
Now my question is , since Spark is providing the Join functionality, I have 
assumed that it will handle the data parallelism automatically. Now is Spark 
provide some functionality which I can directly use for join rather than 
implementing Mapside join using Broadcast on my own or any other better way is 
also welcome.

I assume that this might be very common problem for all and looking out for 
suggestions.

Thanks 
Stuti Awasthi



::DISCLAIMER::


The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only.
E-mail transmission is not guaranteed to be secure or error-free as information 
could be intercepted, corrupted,
lost, destroyed, arrive late or incomplete, or may contain viruses in 
transmission. The e mail and its contents
(with or without referred errors) shall therefore not attach any liability on 
the originator or HCL or its affiliates.
Views or opinions, if any, presented in this email are solely those of the 
author and may not necessarily reflect the
views or opinions of HCL or its affiliates. Any form of reproduction, 
dissemination, copying, disclosure, modification,
distribution and / or publication of this message without the prior written 
consent of authorized representative of
HCL is strictly prohibited. If you have received this email in error please 
delete it and notify the sender immediately.
Before opening any email and/or attachments, please check them for viruses and 
other defects.




Extra libs in executor classpath

2016-03-19 Thread Леонид Поляков
Hello, guys!



I’ve been developing a kind of framework on top of spark, and my idea is to
bundle the framework jars and some extra configs with the spark and pass it
to other developers for their needs. So that devs can use this bundle and
run usual spark stuff but with extra flavor that framework will add.



I’m trying to figure out how to properly set up the driver/executor
classpath, so that framework classes are always loaded when you use the
bundle.

I put framework libs in /lib folder right now, but will switch to something
more specific later. I’m putting next spark-defaults.conf into my bundle:



spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*

spark.driver.extraClassPath lib/*



And this seem to work, but I want to get rid of the absolute path from
spark.executor.extraClassPath and use something relative, or spark home
somehow, since libs are right there under /lib

I’ve tried these settings for executor, and they do not work:

spark.executor.extraClassPath $SPARK_HOME/lib/*

spark.executor.extraClassPath lib/*



I’ve found out that work directory for started workers is like
$SPARK_HOME/work/app-20160316070310-0002/0, so this works:

spark.executor.extraClassPath ../../../lib/*



But looks cheaty and not stable.



Could you help me with this issue? Maybe there are some placeholders that I
can use in configs?

Let me know if you need any worker/master/driver logs



P.S. driver does not work if I am not in $SPARK_HOME when I execute
spark-submit, e.g. if I do

cd bin

./spark-submit …

Then driver classpath is relative to /bin and now lib/* or ./lib/* in
classpath does not work, so I need $SPARK_HOME for driver as well


Thanks, Leonid


Re: The error to read HDFS custom file in spark.

2016-03-19 Thread Jakob Odersky
Doesn't FileInputFormat require type parameters? Like so:

class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat[LW, RD]

I haven't verified this but it could be related to the compile error
you're getting.

On Thu, Mar 17, 2016 at 9:53 AM, Benyi Wang  wrote:
> I would say change
>
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends
> FileInputFormat
>
> to
>
> class RawDataInputFormat[LongWritable, RDRawDataRecord] extends
> FileInputFormat
>
>
> On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh 
> wrote:
>>
>> Hi Tony,
>>
>> Is
>>
>> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>
>> One of your own packages?
>>
>> Sounds like it is one throwing the error
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 17 March 2016 at 15:21, Tony Liu  wrote:
>>>
>>> Hi,
>>>My HDFS file is store with custom data structures. I want to read it
>>> with SparkContext object.So I define a formatting object:
>>>
>>> 1. code of RawDataInputFormat.scala
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/16/16.
>>>   */
>>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
>>> extends FileInputFormat {
>>>
>>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter:
>>> Reporter): RecordReader[LW, RD] = {
>>> new RawReader(split, job, reporter)
>>>   }
>>>
>>> }
>>>
>>> 2. code of RawReader.scala
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/17/16.
>>>   */
>>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends
>>> RecordReader[LW, RD] {
>>>
>>>   var reader: SequenceFile.Reader = null
>>>   var currentPos: Long = 0L
>>>   var length: Long = 0L
>>>
>>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>>> this()
>>> val p = (split.asInstanceOf[FileSplit]).getPath
>>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>>   }
>>>
>>>   override def next(key: LW, value: RD): Boolean = {
>>> val flag = reader.next(key, value)
>>> currentPos = reader.getPosition()
>>> flag
>>>   }
>>>
>>>   override def getProgress: Float = Math.min(1.0f, currentPos /
>>> length.toFloat)
>>>
>>>   override def getPos: Long = currentPos
>>>
>>>   override def createKey(): LongWritable = {
>>> new LongWritable()
>>>   }
>>>
>>>   override def close(): Unit = {
>>> reader.close()
>>>   }
>>>
>>>   override def createValue(): RDRawDataRecord = {
>>> new RDRawDataRecord()
>>>   }
>>> }
>>>
>>> 3. code of RDRawDataRecord.scala
>>>
>>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>>> import java.io.DataInput;
>>> import java.io.DataOutput;
>>> import java.io.IOException;
>>> import org.apache.commons.lang.StringUtils;
>>> import org.apache.hadoop.io.Writable;
>>>
>>> public class RDRawDataRecord implements Writable {
>>> private String smac;
>>> private String dmac;
>>> private int hrssi;
>>> private int lrssi;
>>> private long fstamp;
>>> private long lstamp;
>>> private long maxstamp;
>>> private long minstamp;
>>> private long stamp;
>>>
>>> public void readFields(DataInput in) throws IOException {
>>> this.smac = in.readUTF();
>>> this.dmac = in.readUTF();
>>> this.hrssi = in.readInt();
>>> this.lrssi = in.readInt();
>>> this.fstamp = in.readLong();
>>> this.lstamp = in.readLong();
>>> this.maxstamp = in.readLong();
>>> this.minstamp = in.readLong();
>>> this.stamp = in.readLong();
>>> }
>>>
>>> public void write(DataOutput out) throws IOException {
>>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>>> out.writeInt(this.hrssi);
>>> out.writeInt(this.lrssi);
>>> out.writeLong(this.fstamp);
>>> out.writeLong(this.lstamp);
>>> out.writeLong(this.maxstamp);
>>> out.writeLong(this.minstamp);
>>> out.writeLong(this.stamp);
>>> }
>>>
>>> /**
>>>
>>> ignore getter setter
>>>
>>> **/
>>>
>>> }
>>>
>>> At last, I use this code to run:
>>>
>>> val filePath =
>>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>>> val conf = new SparkConf()
>>> conf.setMaster("local")
>>> conf.setAppName("demo")
>>> val sc = new SparkContext(conf)
>>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
>>> RawDataInputFormat[LongWritable, 

Improving Spark Scheduler Delay

2016-03-19 Thread Prabhu Joseph
Hi All,

On running Concurrent Spark Jobs (huge number of tasks) with same Spark
Context, there is high scheduler delay. We have FAIR schedulingPolicy set
and also we tried with different pool for each jobs but still no
improvement. What are the tuning ways to improve Scheduler Delay.

Thanks,
Prabhu Joseph


Re: Checkpoint of DStream joined with RDD

2016-03-19 Thread Lubomir Nerad

The HBase version is 1.0.1.1.

Thanks,
Lubo

On 18.3.2016 17:29, Ted Yu wrote:
I looked at the places in SparkContext.scala where NewHadoopRDD is 
constrcuted.

It seems the Configuration object shouldn't be null.

Which hbase release are you using (so that I can see which line the 
NPE came from) ?


Thanks

On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad 
> wrote:


Hi,

I tried to replicate the example of joining DStream with lookup
RDD from

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation.
It works fine, but when I enable checkpointing for the
StreamingContext and let the application to recover from a
previously created checkpoint, I always get an exception during
start and the whole application fails. I tried various types of
lookup RDD, but the result is the same.

Exception in the case of HBase RDD is:

Exception in thread "main" java.lang.NullPointerException
at

org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at

org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at

org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at
org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
at java.util.TimSort.sort(TimSort.java:216)
at java.util.Arrays.sort(Arrays.java:1438)
at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
at scala.collection.AbstractSeq.sorted(Seq.scala:40)
at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
at
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at

org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at

org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at
org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)
at
org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546)

I tried Spark 1.5.2 and 1.6.0 without success. The problem seems
to be that RDDs use some transient fields which are not restored
when they are recovered from checkpoint files. In case of some RDD
implementations it is SparkContext, but it can be also
implementation specific Configuration object, etc. I see in the
sources that in the case of DStream recovery, the DStreamGraph
takes care of restoring StreamingContext in all its DStream-s. But
I haven't found any similar mechanism for RDDs.

So my question is whether I am doing something wrong or this is a
bug in Spark? If later, is there some workaround except for
implementing a custom DStream which will return the same RDD every
batch interval and joining at DStream level instead of RDD level
in transform?

I apologize if this has been discussed in the past and I missed it
when looking into archive.

Thanks,
Lubo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: spark shuffle service on yarn

2016-03-19 Thread Steve Loughran

On 19 Mar 2016, at 02:25, Koert Kuipers 
> wrote:

spark on yarn is nice because i can bring my own spark. i am worried that the 
shuffle service forces me to use some "sanctioned" spark version that is 
officially "installed" on the cluster.

so... can i safely install the spark 1.3 shuffle service on yarn and use it 
with other 1.x versions of spark?


That's an interesting question. I don't know the answer there. Normally I'd 
just say "use the latest, hope they got compatibility right". Spark 1.6.1 fixed 
some classpath issues related to shuffle JAR and hadoop namenode dependencies.

There's a YARN JIRA on isolating nodemanager plugins, first with classpath, 
ideally with forked processes (for failure resilience and isolation of native 
libs). I guess a goal there should be "supports different versions of the same 
app"; that may need app changes doo


Re: Error using collectAsMap() in scala

2016-03-19 Thread Ted Yu
It is defined in:
core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

On Thu, Mar 17, 2016 at 8:55 PM, Shishir Anshuman  wrote:

> I am using following code snippet in scala:
>
>
> *val dict: RDD[String] = sc.textFile("path/to/csv/file")*
> *val dict_broadcast=sc.broadcast(dict.collectAsMap())*
>
> On compiling It generates this error:
>
> *scala:42: value collectAsMap is not a member of
> org.apache.spark.rdd.RDD[String]*
>
>
> *val dict_broadcast=sc.broadcast(dict.collectAsMap())
> ^*
>


Setting up spark to run on two nodes

2016-03-19 Thread Ashok Kumar
Experts.
Please your valued advice.
I have spark 1.5.2 set up as standalone for now and I have started the master 
as below
start-master.sh

I also have modified config/slave file to have 
# A Spark Worker will be started on each of the machines listed below.
localhostworkerhost

On the localhost I start slave as follows:
start-slave.shspark:localhost:7077 

Questions.
If I want worker process to be started not only on localhost but also workerhost
1) Do I need just to do start-slave.sh on localhost and it will start the 
worker process on other node -> workerhost2) Do I have to runt start-slave.sh 
spark:workerhost:7077 as well locally on workerhost3) On GUI 
http://localhost:4040/environment/ I do not see any reference to worker process 
running on workerhost
Appreciate any help on how to go about starting the master on localhost and 
starting two workers one on localhost and the other on workerhost
Thanking you


Re: [Error] : dynamically union All + adding new column

2016-03-19 Thread Ted Yu
bq. .drop("Col9")

Could it be due to the above ?

On Wed, Mar 16, 2016 at 7:29 PM, Divya Gehlot 
wrote:

> Hi,
> I am dynamically doing union all and adding new column too
>
> val dfresult =
>> dfAcStamp.select("Col1","Col1","Col3","Col4","Col5","Col6","col7","col8","col9")
>> val schemaL = dfresult.schema
>> var dffiltered = sqlContext.createDataFrame(sc.emptyRDD[Row], schemaL)
>> for ((key,values) <- lcrMap) {
>> if(values(4) != null){
>>  println("Condition="+values(4))
>>  val renameRepId = values(0)+"REP_ID"
>>  dffiltered.printSchema
>> dfresult.printSchema
>>  dffiltered =
>> dffiltered.unionAll(dfresult.withColumn(renameRepId,lit(values(3))).drop("Col9").select("Col1","Col1","Col3","Col4","Col5","Col6","Col7","Col8","Col9").where(values(4))).distinct()
>
>
>> }
>> }
>
>
>
> when I am printing the schema
> dfresult
> root
>  |-- Col1: date (nullable = true)
>  |-- Col2: date (nullable = true)
>  |-- Col3: string (nullable = false)
>  |-- Col4: string (nullable = false)
>  |-- Col5: string (nullable = false)
>  |-- Col6: string (nullable = true)
>  |-- Col7: string (nullable = true)
>  |-- Col8: string (nullable = true)
>  |-- Col9: null (nullable = true)
>
>
> dffiltered Schema
> root
>  |-- Col1: date (nullable = true)
>  |-- Col2: date (nullable = true)
>  |-- Col3: string (nullable = false)
>  |-- Col4: string (nullable = false)
>  |-- Col5: string (nullable = false)
>  |-- Col6: string (nullable = true)
>  |-- Col7: string (nullable = true)
>  |-- Col8: string (nullable = true)
>  |-- Col9: null (nullable = true)
>
>
> As It is priting the same schema but when I am doing UnionAll its giving
> me below error
> org.apache.spark.sql.AnalysisException: Union can only be performed on
> tables with the same number of columns, but the left table has 9 columns
> and the right has 8;
>
> Could somebody help me in pointing out my mistake  .
>
>
> Thanks,
>
>
>


Re: Incomplete data when reading from S3

2016-03-19 Thread DB Tsai
You need to use wholetextfiles to read the whole file at once. Otherwise,
It can be split.

DB Tsai - Sent From My Phone
On Mar 17, 2016 12:45 AM, "Blaž Šnuderl"  wrote:

> Hi.
>
> We have json data stored in S3 (json record per line). When reading the
> data from s3 using the following code we started noticing json decode
> errors.
>
> sc.textFile(paths).map(json.loads)
>
>
> After a bit more investigation we noticed an incomplete line, basically
> the line was
>
>> {"key": "value", "key2":  <- notice the line abruptly ends with no json
>> close tag etc
>
>
> It is not an issue with our data and it doesn't happen very often, but it
> makes us very scared since it means spark could be dropping data.
>
> We are using spark 1.5.1. Any ideas why this happens and possible fixes?
>
> Regards,
> Blaž Šnuderl
>


Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Bryan Jeffrey
Cody et. al,

I am seeing a similar error.  I've increased the number of retries.  Once
I've got a job up and running I'm seeing it retry correctly. However, I am
having trouble getting the job started - number of retries does not seem to
help with startup behavior.

Thoughts?

Regards,

Bryan Jeffrey

On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger  wrote:

> That's a networking error when the driver is attempting to contact
> leaders to get the latest available offsets.
>
> If it's a transient error, you can look at increasing the value of
> spark.streaming.kafka.maxRetries, see
>
> http://spark.apache.org/docs/latest/configuration.html
>
> If it's not a transient error, you need to look at your brokers + your
> network environment.
>
> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
>  wrote:
> > Hi,
> >
> > Can you check Kafka topic replication ? And leader information?
> >
> > Regards,
> > Surendra M
> >
> >
> >
> > -- Surendra Manchikanti
> >
> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss 
> wrote:
> >>
> >> Hi,
> >>
> >> I have a SparkStream (with Kafka) job, after running several days, it
> >> failed with following errors:
> >> ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> >> issue?
> >>
> >>
> >>
> >> Regards
> >>
> >>
> >>
> >>
> >>
> >>
> >> *** from the log ***
> >>
> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
> is
> >> overridden to
> >>
> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> >> java.nio.channels.ClosedChannelException
> >>
> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> >> 1458188031800 ms
> >>
> >> org.apache.spark.SparkException:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> >>
> >> at scala.Option.orElse(Option.scala:257)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >>
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>
> >> at
> >>
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >>
> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
> >>
> >> at scala.util.Try$.apply(Try.scala:161)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
> >>
> >> at
> >> org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
> >>
> >> at
> >>
> 

Re: Unit testing framework for Spark Jobs?

2016-03-19 Thread Vikas Kawadia
I just wrote a blog post on Unit testing Apache Spark with py.test
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

If you prefer using the py.test framework, then it might be useful.

-vikas

On Wed, Mar 2, 2016 at 10:59 AM, radoburansky 
wrote:

> I am sure you have googled this:
> https://github.com/holdenk/spark-testing-base
>
> On Wed, Mar 2, 2016 at 6:54 PM, SRK [via Apache Spark User List] <[hidden
> email] > wrote:
>
>> Hi,
>>
>> What is a good unit testing framework for Spark batch/streaming jobs? I
>> have core spark, spark sql with dataframes and streaming api getting used.
>> Any good framework to cover unit tests for these APIs?
>>
>> Thanks!
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Unit testing framework for Spark Jobs?
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Apache Beam Spark runner

2016-03-19 Thread Sela, Amit
Hi all,

The Apache Beam Spark runner is now available at: 
https://github.com/apache/incubator-beam/tree/master/runners/spark Check it out!
The Apache Beam (http://beam.incubator.apache.org/) project is a unified model 
for building data pipelines using Google’s Dataflow programming model, and now 
it supports Spark as well!

Take it for a ride on your Spark cluster!

Thanks,
Amit




Re: installing packages with pyspark

2016-03-19 Thread Jakob Odersky
Hi,
regarding 1, packages are resolved locally. That means that when you
specify a package, spark-submit will resolve the dependencies and
download any jars on the local machine, before shipping* them to the
cluster. So, without a priori knowledge of dataproc clusters, it
should be no different to specify packages.

Unfortunatly I can't help with 2.

--Jakob

*shipping in this case means making them available via the network

On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale  wrote:
> Hi all,
>
> I had couple of questions.
> 1. Is there documentation on how to add the graphframes or any other package
> for that matter on the google dataproc managed spark clusters ?
>
> 2. Is there a way to add a package to an existing pyspark context through a
> jupyter notebook ?
>
> --aj

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The build-in indexes in ORC file does not work.

2016-03-19 Thread Jörn Franke
How much data are you querying? What is the query? How selective it is supposed 
to be? What is the block size?

> On 16 Mar 2016, at 11:23, Joseph  wrote:
> 
> Hi all,
> 
> I have known that ORC provides three level of indexes within each file, file 
> level, stripe level, and row level. 
> The file and stripe level statistics are in the file footer so that they are 
> easy to access to determine if the rest of the file needs to be read at all. 
> Row level indexes include both column statistics for each row group and 
> position for seeking to the start of the row group. 
> 
> The following is my understanding:
> 1. The file and stripe level indexes are forcibly generated, we can not 
> control them.
> 2. The row level indexes can be configured by "orc.create.index"(whether to 
> create row indexes) and "orc.row.index.stride"(number of rows between index 
> entries).
> 3. Each Index has statistics of min, max for each column, so sort data by the 
> filter column will bring better performance.
> 4. To use any one of the three level of indexes,we should enable predicate 
> push-down by setting spark.sql.orc.filterPushdown=true (in sparkSQL) or 
> hive.optimize.ppd=true (in hive).
> 
> But I found the  build-in indexes in ORC files did not work both in spark 
> 1.5.2 and hive 1.2.1:
> First, when the query statement with where clause did't match any record (the 
> filter column had a value beyond the range of data),  the performance when 
> enabled  predicate push-down was almost the same with when disabled predicate 
> push-down.  I think, when the filter column has a value beyond the range of 
> data, all of the orc files will not be scanned if use file level indexes,  so 
> the performance should improve obviously.
> 
> The second, when enabled "orc.create.index" and sorted data by filter column 
> and where clause can only match a few records, the performance when enabled  
> predicate push-down was almost the same with when disabled predicate 
> push-down. 
> 
> The third, when enabled  predicate push-down and "orc.create.index", the 
> performance when  filter column had a value beyond the range of data was 
> almost the same with when filter column had a value covering almost the whole 
> data. 
> 
> So,  has anyone used ORC's build-in indexes before (especially in spark SQL)? 
>  What's my issue?
> 
> Thanks!
> 
> Joseph


Stop spark application when the job is complete.

2016-03-19 Thread Imre Nagi
Hi,

I have a spark application for batch processing in standalone cluster. The
job is to query the database and then do some transformation, aggregation,
and several actions such as indexing the result into the elasticsearch.

If I dont call the sc.stop(), the spark application wont stop and take will
keep the resource used by the application. In the other hand, if I call the
sc.stop(), the spark app will be stopped before it query the database and
do further processing.

Can anyone help me to give best practice in stopping the spark application
when the job is complete?

Thanks,
Imre


Setting up log4j2/logback with Spark 1.6.0

2016-03-19 Thread Yuval.Itzchakov
I've been trying to get log4j2 and logback to get to play nice with Spark
1.6.0 so I can properly offload my logs to a remote server.

I've attempted the following things:

1. Setting logback/log4j2 on the class path for both the driver and worker
nodes
2. Passing -Dlog4j.configurationFile= and -Dlogback.configuration= flags to
-extraJavaOptions

log4j2 used to work on Spark 1.5.2. After we've upgraded, the default
logging framework defers to log4j 1.2. Even when I get Spark to work with
logback, it doesn't find my logback.xml file located in
%APP_DIRECTORY%/classes/logback.xml. 

I'm always seeing Spark defer to this:

"Reading configuration from URL
jar:file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1.jar!/org/apache/spark/log4j-defaults.properties"

Has anyone had similar issues with this?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-log4j2-logback-with-Spark-1-6-0-tp26518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [discuss] making SparkEnv private in Spark 2.0

2016-03-19 Thread Reynold Xin
On Wed, Mar 16, 2016 at 3:29 PM, Mridul Muralidharan 
wrote:

> b) Shuffle manager (to get shuffle reader)
>

What's the use case for shuffle manager/reader? This seems like using super
internal APIs in applications.


Re: Apache Spark Exception in thread “main” java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class

2016-03-19 Thread Josh Rosen
Err, whoops, looks like this is a user app and not building Spark itself,
so you'll have to change your deps to use the 2.11 versions of Spark.
e.g. spark-streaming_2.10 -> spark-streaming_2.11.

On Wed, Mar 16, 2016 at 7:07 PM Josh Rosen  wrote:

> See the instructions in the Spark documentation:
> https://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>
> On Wed, Mar 16, 2016 at 7:05 PM satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>>
>>
>> Hi,
>>
>> Scala version:2.11.7(had to upgrade the scala verison to enable case
>> clasess to accept more than 22 parameters.)
>>
>> Spark version:1.6.1.
>>
>> PFB pom.xml
>>
>> Getting below error when trying to setup spark on intellij IDE,
>>
>> 16/03/16 18:36:44 INFO spark.SparkContext: Running Spark version 1.6.1
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> scala/collection/GenTraversableOnce$class at
>> org.apache.spark.util.TimeStampedWeakValueHashMap.(TimeStampedWeakValueHashMap.scala:42)
>> at org.apache.spark.SparkContext.(SparkContext.scala:298) at
>> com.examples.testSparkPost$.main(testSparkPost.scala:27) at
>> com.examples.testSparkPost.main(testSparkPost.scala) at
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606) at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused
>> by: java.lang.ClassNotFoundException:
>> scala.collection.GenTraversableOnce$class at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
>> java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 9 more
>>
>> pom.xml:
>>
>> http://maven.apache.org/POM/4.0.0; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/maven-v4_0_0.xsd;>
>> 4.0.0
>> StreamProcess
>> StreamProcess
>> 0.0.1-SNAPSHOT
>> ${project.artifactId}
>> This is a boilerplate maven project to start using Spark in 
>> Scala
>> 2010
>>
>> 
>> 1.6
>> 1.6
>> UTF-8
>> 2.10
>> 
>> 2.11.7
>> 
>>
>> 
>> 
>> 
>> cloudera-repo-releases
>> https://repository.cloudera.com/artifactory/repo/
>> 
>> 
>>
>> 
>> src/main/scala
>> src/test/scala
>> 
>> 
>> 
>> maven-assembly-plugin
>> 
>> 
>> package
>> 
>> single
>> 
>> 
>> 
>> 
>> 
>> 
>> jar-with-dependencies
>> 
>> 
>> 
>> 
>> 
>> net.alchim31.maven
>> scala-maven-plugin
>> 3.2.2
>> 
>> 
>> 
>> compile
>> testCompile
>> 
>> 
>> 
>> 
>> -dependencyfile
>> 
>> ${project.build.directory}/.scala_dependencies
>> 
>> 
>> 
>> 
>> 
>>
>> 
>> 
>> maven-assembly-plugin
>> 2.4.1
>> 
>> 
>> jar-with-dependencies
>> 
>> 
>> 
>> 
>> make-assembly
>> package
>> 
>> single
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> org.scala-lang
>> scala-library
>> ${scala.version}
>> 
>> 
>> org.mongodb.mongo-hadoop
>> mongo-hadoop-core
>> 1.4.2
>> 
>> 
>> javax.servlet
>> servlet-api
>> 
>> 
>> 

Re: Joins in Spark

2016-03-19 Thread Rishi Mishra
My suspect is your input file partitions are small. Hence small number of
tasks are started.  Can you provide some more details like how you load the
files and how  the result size is around 500GBs ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Mar 17, 2016 at 12:12 PM, Stuti Awasthi 
wrote:

> Hi All,
>
>
>
> I have to join 2 files both not very big say few MBs only but the result
> can be huge say generating 500GBs to TBs of data.  Now I have tried using
> spark Join() function but Im noticing that join is executing on only 1 or 2
> nodes at the max. Since I have a cluster size of 5 nodes , I tried to pass “
> join(*otherDataset*, [*numTasks*])” as numTasks=10 but again what I
> noticed that all the 9 tasks are finished instantly and only 1 executor is
> processing all the data.
>
>
>
> I searched on internet and got that we can use Broadcast variable to send
> data from 1 file to all nodes and then use map function to do the join. In
> this way I should be able to run multiple task on different executors.
>
> Now my question is , since Spark is providing the Join functionality, I
> have assumed that it will handle the data parallelism automatically. Now is
> Spark provide some functionality which I can directly use for join rather
> than implementing Mapside join using Broadcast on my own or any other
> better way is also welcome.
>
>
>
> I assume that this might be very common problem for all and looking out
> for suggestions.
>
>
>
> Thanks 
>
> Stuti Awasthi
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-19 Thread Ramkumar Venkataraman
I am using Spark streaming and reading data from Kafka using
KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
smallest.

But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
and my spark job crashes.

I want to understand if there is a graceful way to handle this failure and
not kill the job. I want to keep ignoring these exceptions, as some other
partitions are fine and I am okay with data loss.

Is there any way to handle this and not have my spark job crash? I have no
option of increasing the kafka retention period. 

I tried to have the DStream returned by createDirectStream() wrapped in a
Try construct, but since the exception happens in the executor, the Try
construct didn't take effect. Do you have any ideas of how to handle this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Cody Koeninger
That's a networking error when the driver is attempting to contact
leaders to get the latest available offsets.

If it's a transient error, you can look at increasing the value of
spark.streaming.kafka.maxRetries, see

http://spark.apache.org/docs/latest/configuration.html

If it's not a transient error, you need to look at your brokers + your
network environment.

On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
 wrote:
> Hi,
>
> Can you check Kafka topic replication ? And leader information?
>
> Regards,
> Surendra M
>
>
>
> -- Surendra Manchikanti
>
> On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss  wrote:
>>
>> Hi,
>>
>> I have a SparkStream (with Kafka) job, after running several days, it
>> failed with following errors:
>> ERROR DirectKafkaInputDStream:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> Any idea what would be wrong? will it be SparkStreaming buffer overflow
>> issue?
>>
>>
>>
>> Regards
>>
>>
>>
>>
>>
>>
>> *** from the log ***
>>
>> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
>> overridden to
>>
>> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
>> java.nio.channels.ClosedChannelException
>>
>> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
>> 1458188031800 ms
>>
>> org.apache.spark.SparkException:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>
>> at scala.Option.orElse(Option.scala:257)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>>
>> at scala.util.Try$.apply(Try.scala:161)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>>
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at
>> 

CfP 11th Workshop on Virtualization in High-Performance Cloud Computing (VHPC '16)

2016-03-19 Thread VHPC 16

CALL FOR PAPERS


11th Workshop on Virtualization in High­-Performance Cloud Computing  (VHPC
'16)
held in conjunction with the International Supercomputing Conference - High
Performance,
June 19-23, 2016, Frankfurt, Germany.



Date: June 23, 2016
Workshop URL: http://vhpc.org

Paper Submission Deadline: April 25, 2016


Call for Papers

Virtualization technologies constitute a key enabling factor for flexible
resource
management in modern data centers, and particularly in cloud environments.
Cloud providers need to manage complex infrastructures in a seamless
fashion to support
the highly dynamic and heterogeneous workloads and hosted applications
customers
deploy. Similarly, HPC environments have been increasingly adopting
techniques that
enable flexible management of vast computing and networking resources,
close to marginal
provisioning cost, which is unprecedented in the history of scientific and
commercial
computing.

Various virtualization technologies contribute to the overall picture in
different ways: machine
virtualization, with its capability to enable consolidation of multiple
under­utilized servers with
heterogeneous software and operating systems (OSes), and its capability to
live­-migrate a
fully operating virtual machine (VM) with a very short downtime, enables
novel and dynamic
ways to manage physical servers; OS-­level virtualization (i.e.,
containerization), with its
capability to isolate multiple user­-space environments and to allow for
their co­existence
within the same OS kernel, promises to provide many of the advantages of
machine
virtualization with high levels of responsiveness and performance; I/O
Virtualization allows
physical NICs/HBAs to take traffic from multiple VMs or containers; network
virtualization,
with its capability to create logical network overlays that are independent
of the underlying
physical topology and IP addressing, provides the fundamental ground on top
of which
evolved network services can be realized with an unprecedented level of
dynamicity and
flexibility; the increasingly adopted paradigm of Software-­Defined
Networking (SDN)
promises to extend this flexibility to the control and data planes of
network paths.


Topics of Interest

The VHPC program committee solicits original, high-quality submissions
related to
virtualization across the entire software stack with a special focus on the
intersection of HPC
and the cloud. Topics include, but are not limited to:

- Virtualization in supercomputing environments, HPC clusters, cloud HPC
and grids
- OS-level virtualization including container runtimes (Docker, rkt et al.)
- Lightweight compute node operating systems/VMMs
- Optimizations of virtual machine monitor platforms, hypervisors
- QoS and SLA in hypervisors and network virtualization
- Cloud based network and system management for SDN and NFV
- Management, deployment and monitoring of virtualized environments
- Virtual per job / on-demand clusters and cloud bursting
- Performance measurement, modelling and monitoring of virtualized/cloud
workloads
- Programming models for virtualized environments
- Virtualization in data intensive computing and Big Data processing
- Cloud reliability, fault-tolerance, high-availability and security
- Heterogeneous virtualized environments, virtualized accelerators, GPUs
and co-processors
- Optimized communication libraries/protocols in the cloud and for HPC in
the cloud
- Topology management and optimization for distributed virtualized
applications
- Adaptation of emerging HPC technologies (high performance networks, RDMA,
etc..)
- I/O and storage virtualization, virtualization aware file systems
- Job scheduling/control/policy in virtualized environments
- Checkpointing and migration of VM-based large compute jobs
- Cloud frameworks and APIs
- Energy-efficient / power-aware virtualization


The Workshop on Virtualization in High­-Performance Cloud Computing (VHPC)
aims to
bring together researchers and industrial practitioners facing the
challenges
posed by virtualization in order to foster discussion, collaboration,
mutual exchange
of knowledge and experience, enabling research to ultimately provide novel
solutions for virtualized computing systems of tomorrow.

The workshop will be one day in length, composed of 20 min paper
presentations, each
followed by 10 min discussion sections, plus lightning talks that are
limited to 5 minutes.
Presentations may be accompanied by interactive demonstrations.

Important Dates

April 25, 2016 - Paper submission deadline
May 30, 2016 Acceptance notification
June 23, 2016 - Workshop Day
July 25, 2016 - Camera-ready version due


Chair

Michael Alexander (chair), TU Wien, Austria
Anastassios Nanos (co-­chair), NTUA, Greece
Balazs Gerofi (co-­chair), ​RIKEN Advanced Institute for Computational
Science​, Japan


Program committee

Stergios 

Re: Spark configuration with 5 nodes

2016-03-19 Thread Steve Loughran

On 11 Mar 2016, at 16:25, Mich Talebzadeh 
> wrote:

Hi Steve,

My argument has always been that if one is going to use Solid State Disks 
(SSD), it makes sense to have it for NN disks start-up from fsimage etc. 
Obviously NN lives in memory. Would you also rerommend RAID10 (mirroring & 
striping) for NN disks?


I don't have any suggestions there, sorry. That said, NN disks do need to be 
RAIDed for protection against corruption, as they don't have the cross-cluster 
replication. They matter

Thanks





Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 11 March 2016 at 10:42, Steve Loughran 
> wrote:

On 10 Mar 2016, at 22:15, Ashok Kumar 
> wrote:


Hi,

We intend  to use 5 servers which will be utilized for building Bigdata Hadoop 
data warehouse system (not using any propriety distribution like Hortonworks or 
Cloudera or others).

I'd argue that life is if simpler with either of these, or bigtop+ambari built 
up yourself, for the management and monitoring tools more than anything else. 
Life is simpler if there's a web page of cluster status. But: DIY teaches you 
the internals of how things work, which is good for getting your hands dirty 
later on. Just start to automate things from the outset, keep configs under 
SCM, etc. And decide whether or not you want to go with Kerberos (==secure 
HDFS) from the outset. If you don't, put your cluster on a separate isolated 
subnet. You ought to have the boxes on a separate switch anyway if you can, 
just to avoid network traffic hurting anyone else on the switch.

All servers configurations are 512GB RAM, 30TB storage and 16 cores, Ubuntu 
Linux servers. Hadoop will be installed on all the servers/nodes. Server 1 will 
be used for NameNode plus DataNode as well. Server 2 will be  used for standby 
NameNode & DataNode. The rest of the servers will be used as DataNodes..


1. Make sure you've got the HDFS/NN space allocation on the two NNs set up so 
that HDFS blocks don't get into the way of the NN's needs (which ideally should 
be on a separate disk with RAID turned on);
2. Same for the worker nodes; temp space matters
3. On a small cluster, the cost of a DN failure is more significant: a bigger 
fraction of the data will go offline, recovery bandwidth limited to the 4 
remaining boxes, etc, etc. Just be aware of that: in a bigger cluster, a single 
server is usually less traumatic. Though HDFS-599 shows that even facebook can 
lose a cluster or two.

Now we would like to install Spark on each servers to create Spark cluster. Is 
that the good thing to do or we should buy additional hardware for Spark 
(minding cost here) or simply do we require additional memory to accommodate 
Spark as well please. In that case how much memory for each Spark node would 
you recommend?


You should be running your compute work on the same systems as the data, as its 
the "hadoop cluster way"; locality of data ==> performance. If you were to buy 
more hardware, go for more store+compute, rather than just compute.

Spark likes RAM for sharing results; less RAM == more problems. but: you can 
buy extra RAM if you need it, provided you've got space in the servers to put 
it in. Same for storage.

Do make sure that you have ECC memory; there are some papers from google and 
microsoft on that topic if you want links to the details. Without ECC your data 
will be corrupted *and you won't even know*

-Steve






Re: Checkpoint of DStream joined with RDD

2016-03-19 Thread Ted Yu
This is the line where NPE came from:

if (conf.get(SCAN) != null) {

So Configuration instance was null.

On Fri, Mar 18, 2016 at 9:58 AM, Lubomir Nerad 
wrote:

> The HBase version is 1.0.1.1.
>
> Thanks,
> Lubo
>
>
> On 18.3.2016 17:29, Ted Yu wrote:
>
> I looked at the places in SparkContext.scala where NewHadoopRDD is
> constrcuted.
> It seems the Configuration object shouldn't be null.
>
> Which hbase release are you using (so that I can see which line the NPE
> came from) ?
>
> Thanks
>
> On Fri, Mar 18, 2016 at 8:05 AM, Lubomir Nerad 
> wrote:
>
>> Hi,
>>
>> I tried to replicate the example of joining DStream with lookup RDD from
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation.
>> It works fine, but when I enable checkpointing for the StreamingContext and
>> let the application to recover from a previously created checkpoint, I
>> always get an exception during start and the whole application fails. I
>> tried various types of lookup RDD, but the result is the same.
>>
>> Exception in the case of HBase RDD is:
>>
>> Exception in thread "main" java.lang.NullPointerException
>> at
>> org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
>> at
>> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
>> at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
>> at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
>> at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
>> at java.util.TimSort.sort(TimSort.java:216)
>> at java.util.Arrays.sort(Arrays.java:1438)
>> at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
>> at scala.collection.AbstractSeq.sorted(Seq.scala:40)
>> at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
>> at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
>> at
>> org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at
>> org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)
>> at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546)
>>
>> I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be
>> that RDDs use some transient fields which are not restored when they are
>> recovered from checkpoint files. In case of some RDD implementations it is
>> SparkContext, but it can be also implementation specific Configuration
>> object, etc. I see in the sources that in the case of DStream recovery, the
>> DStreamGraph takes care of restoring StreamingContext in all its DStream-s.
>> But I haven't found any similar mechanism for RDDs.
>>
>> So my question is whether I am doing something wrong or this is a bug in
>> Spark? If later, is there some workaround except for implementing a custom
>> DStream which will return the same RDD every batch interval and joining at
>> DStream level instead of RDD level in transform?
>>
>> I apologize if this has been discussed in the past and I missed it when
>> looking into archive.
>>
>> Thanks,
>> Lubo
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


What is the most efficient and scalable way to get all the recommendation results from ALS model ?

2016-03-19 Thread Hiroyuki Yamada
Hi,

I'm testing Collaborative Filtering with Milib.
Making a model by ALS.trainImplicit (or train) seems scalable as far as I
have tested,
but I'm wondering how I can get all the recommendation results efficiently.

The predictAll method can get all the results,
but it needs the whole user-product matrix in memory as an input.
So if there are 1 million users and 1 million products, then the number of
elements is too large (1 million x 1 million)
and the amount of memory to hold them is more than a few TB even when the
element size in only 4B,
which is not a realistic size of memory even now.

# (100*100)*4/1000/1000/1000/1000 => near equals 4TB)

We can, of course, use predict method per user,
but, as far as I tried, it is very slow to get 1 million users' results.

Do I miss something ?
Are there any other better ways to get all the recommendation results in
scalable and efficient way ?

Best regards,
Hiro


RE: Subquery performance

2016-03-19 Thread Younes Naguib
Anyways to cache the subquery or force a broadcast join without persisting it?

y

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: March-17-16 8:59 PM
To: Younes Naguib
Cc: user@spark.apache.org
Subject: Re: Subquery performance

Try running EXPLAIN on both version of the query.

Likely when you cache the subquery we know that its going to be small so use a 
broadcast join instead of a shuffling the data.

On Thu, Mar 17, 2016 at 5:53 PM, Younes Naguib 
> wrote:
Hi all,

I’m running a query that looks like the following:
Select col1, count(1)
From (Select col2, count(1) from tab2 group by col2)
Inner join tab1 on (col1=col2)
Group by col1

This creates a very large shuffle, 10 times the data size, as if the subquery 
was executed for each row.
Anything can be done to tune to help tune this?
When the subquery in persisted, it runs much faster, and the shuffle is 50 
times smaller!

Thanks,
Younes



[Spark-1.5.2]Column renaming with withColumnRenamed has no effect

2016-03-19 Thread Divya Gehlot
Hi,
I am adding a new column and renaming it at same time but the renaming
doesnt have any effect.

dffiltered =
> dffiltered.unionAll(dfresult.withColumn("Col1",lit("value1").withColumn("Col2",lit("value2")).cast("int")).withColumn("Col3",lit("values3")).withColumnRenamed("Col1","Col1Rename").drop("Col1")


Can anybody help me pointing out my mistake ?

Thanks,
Divya


Re: Spark configuration with 5 nodes

2016-03-19 Thread Mich Talebzadeh
Thank you for info Steve.

I always believed (IMO) that there is an optimal position where one can
plot the projected NN memory (assuming 1GB--> 40TB of data) to the number
of nodes. For example heuristically how many nodes would be sufficient for
1PB of storage with nodes each having  512GB of memory, 50TB of storage and
32 cores? That will require 25GB of RAM for NN with 20 DN in the cluster.
but then one can half that number of nodes to 10 and increase the storage
to 100TG on each. So the question is the optimal balance between storage
and nodes. Would one go to more DNs and less storage or lesser number
of DNs and more storage in each DN. The proponent may argue that more nodes
provide better MPP but at what cost to the operation, start-up and
maintenance?

Cheers


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 18 March 2016 at 11:42, Steve Loughran  wrote:

>
> > On 17 Mar 2016, at 12:28, Mich Talebzadeh 
> wrote:
> >
> > Thanks Steve,
> >
> > For NN it all depends how fast you want a start-up. 1GB of NameNode
> memory accommodates around 42T so if you are talking about 100GB of NN
> memory then SSD may make sense to speed up the start-up. Raid 10 is the
> best one that one can get  assuming all internal disks.
>
> I wasn't really thinking of startup: in larger clusters startup time is
> often determined by how long it takes for all the datanodes to report in,
> and for HDFS to exit safe mode. But of course, the NN doesn't start
> listening for DN block reports until it's read in the FS image *and
> replayed the log*, so start time will be O(image+ log-events + DNs)
>
> >
> > In general it is also suggested that fsimage are copied across to NFS
> mount directory between primary and fail-over in case of an issue.
>
> yes
>
> if you're curious, there's a 2011 paper on Y!s experience
>
> https://www.usenix.org/system/files/login/articles/chansler_0.pdf
>
> there are also a trace of HDFS failure events in some of the JIRAs,
> HDFS-599 being the classic, as is HADOOP-572. Both of these document
> cascade failures in Facebook's HDFS clusters. Scale brings interesting
> problems
>
>


Re: best way to do deep learning on spark ?

2016-03-19 Thread charles li
Hi, Alexander,

that's awesome, and when will that feature be released ? Since I want to
know the opportunity cost between waiting for that release and use caffe or
tensorFlow ?

great thanks again

On Thu, Mar 17, 2016 at 10:32 AM, Ulanov, Alexander <
alexander.ula...@hpe.com> wrote:

> Hi Charles,
>
>
>
> There is an implementation of multilayer perceptron in Spark (since 1.5):
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
>
>
> Other features such as autoencoder, convolutional layers, etc. are
> currently under development. Please refer to
> https://issues.apache.org/jira/browse/SPARK-5575
>
>
>
> Best regards, Alexander
>
>
>
> *From:* charles li [mailto:charles.up...@gmail.com]
> *Sent:* Wednesday, March 16, 2016 7:01 PM
> *To:* user 
> *Subject:* best way to do deep learning on spark ?
>
>
>
>
>
> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
> that MLlib does not support deep learning, I want to know is there any way
> to implement deep learning on spark ?
>
>
>
> *Do I must use 3-party package like caffe or tensorflow ?*
>
>
>
> or
>
>
>
> *Does deep learning module list in the MLlib development plan?*
>
>
>
>
> great thanks
>
>
>
> --
>
> *--*
>
> a spark lover, a quant, a developer and a good man.
>
>
>
> http://github.com/litaotao
>



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


RE: The build-in indexes in ORC file does not work.

2016-03-19 Thread Wietsma, Tristan A.
Regarding bloom filters, 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-12417



Sent with Good (www.good.com)

From: Joseph 
Sent: Wednesday, March 16, 2016 9:46:25 AM
To: user
Cc: user; user
Subject: Re: Re: The build-in indexes in ORC file does not work.

terminal_type =0,  260,000,000 rows,  almost cover half of the whole data.
terminal_type =25066, just 3800 rows.

orc 
tblproperties("orc.compress"="SNAPPY","orc.compress.size"="262141","orc.stripe.size"="268435456","orc.row.index.stride"="10","orc.create.index"="true","orc.bloom.filter.columns"="");

The table "gprs" has sorted by terminal_type.  Before sort, I have another 
table named "gprs_orc", I use sparkSQL to sort the data as follows:
(before do this, I set  hive.enforce.sorting=true)
sql> INSERT INTO TABLE gprs SELECT * FROM gprs_orc sort by terminal_type ;
Because the table gprs_orc has 800 files, so generate 800 Tasks, and create 800 
files also in table gprs. But I am not sure whether each file be sorted 
separately or not.

I have tried  bloom filter ,but it makes no improvement。I know about tez, but 
never use, I will try it later.

The following is my test in hive 1.2.1:
1. enable hive.optimize.index.filter and hive.optimize.ppd:
select count(*) from gprs where terminal_type=25080;will not scan data  
 Time taken: 353.345 seconds
select count(*) from gprs where terminal_type=25066;just scan a few row 
groupsTime taken:  354.860 seconds
select count(*) from gprs where terminal_type=0;scan half of 
the data  Time taken:  378.312 seconds

2. disable hive.optimize.index.filter and hive.optimize.ppd:
  select count(*) from gprs where terminal_type=25080;   scan all the data  
Time taken: 389.700 seconds
select count(*) from gprs where terminal_type=25066;   scan all the data
  Time taken:  386.600 seconds
select count(*) from gprs where terminal_type=0;scan all the 
data Time taken:  395.968 seconds

The following is my environment:
  3 nodes,12 cpu cores per node,48G memory free per node,   4 disks 
per node,  3 replications per block , hadoop 2.7.2,hive 1.2.1



Joseph

From: Jörn Franke
Date: 2016-03-16 20:27
To: Joseph
CC: user; user
Subject: Re: The build-in indexes in ORC file does not work.
Not sure it should work. How many rows are affected? The data is sorted?
Have you tried with Tez? Tez has some summary statistics that tells you if you 
use push down. Maybe you need to use HiveContext.
Perhaps a bloom filter could make sense for you as well.

On 16 Mar 2016, at 12:45, Joseph > 
wrote:

Hi,

I have only one table named "gprs",  it has 560,000,000 rows,  and 57 columns.  
The block size is 256M,  total ORC file number is 800, each of them is about 
51M.

my query statement is :
select count(*) from gprs  where  terminal_type = 25080;
select * from gprs  where  terminal_type = 25080;

In the gprs table, the "terminal_type"  column's  value is in [0, 25066]


Joseph

From: Jörn Franke
Date: 2016-03-16 19:26
To: Joseph
CC: user; user
Subject: Re: The build-in indexes in ORC file does not work.
How much data are you querying? What is the query? How selective it is supposed 
to be? What is the block size?

On 16 Mar 2016, at 11:23, Joseph > 
wrote:

Hi all,

I have known that ORC provides three level of indexes within each file, file 
level, stripe level, and row level.
The file and stripe level statistics are in the file footer so that they are 
easy to access to determine if the rest of the file needs to be read at all.
Row level indexes include both column statistics for each row group and 
position for seeking to the start of the row group.

The following is my understanding:
1. The file and stripe level indexes are forcibly generated, we can not control 
them.
2. The row level indexes can be configured by "orc.create.index"(whether to 
create row indexes) and "orc.row.index.stride"(number of rows between index 
entries).
3. Each Index has statistics of min, max for each column, so sort data by the 
filter column will bring better performance.
4. To use any one of the three level of indexes,we should enable predicate 
push-down by setting spark.sql.orc.filterPushdown=true (in sparkSQL) or 
hive.optimize.ppd=true (in hive).

But I found the  build-in indexes in ORC files did not work both in spark 1.5.2 
and hive 1.2.1:
First, when the query statement with where clause did't match any record (the 
filter column had a value 

Re: best way to do deep learning on spark ?

2016-03-19 Thread Daniel Darabos
On Thu, Mar 17, 2016 at 3:51 AM, charles li  wrote:

> Hi, Alexander,
>
> that's awesome, and when will that feature be released ? Since I want to
> know the opportunity cost between waiting for that release and use caffe or
> tensorFlow ?
>

I don't expect MLlib will be able to compete with major players like Caffe
or TensorFlow, at least not in the short term.

Check out https://github.com/amplab/SparkNet. It's from AMPLab (like
Spark), and it runs Caffe or TensorFlow on Spark. I think it's the state of
the art for deep learning on Spark.

great thanks again
>
> On Thu, Mar 17, 2016 at 10:32 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
>> Hi Charles,
>>
>>
>>
>> There is an implementation of multilayer perceptron in Spark (since 1.5):
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>>
>>
>>
>> Other features such as autoencoder, convolutional layers, etc. are
>> currently under development. Please refer to
>> https://issues.apache.org/jira/browse/SPARK-5575
>>
>>
>>
>> Best regards, Alexander
>>
>>
>>
>> *From:* charles li [mailto:charles.up...@gmail.com]
>> *Sent:* Wednesday, March 16, 2016 7:01 PM
>> *To:* user 
>> *Subject:* best way to do deep learning on spark ?
>>
>>
>>
>>
>>
>> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
>> that MLlib does not support deep learning, I want to know is there any way
>> to implement deep learning on spark ?
>>
>>
>>
>> *Do I must use 3-party package like caffe or tensorflow ?*
>>
>>
>>
>> or
>>
>>
>>
>> *Does deep learning module list in the MLlib development plan?*
>>
>>
>>
>>
>> great thanks
>>
>>
>>
>> --
>>
>> *--*
>>
>> a spark lover, a quant, a developer and a good man.
>>
>>
>>
>> http://github.com/litaotao
>>
>
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Rename Several Aggregated Columns

2016-03-19 Thread Andres.Fernandez
Good morning. I have a dataframe and would like to group by on two fields, and 
perform a sum aggregation on more than 500 fields, though I would like to keep 
the same name for the 500 hundred fields (instead of sum(Field)). I do have the 
field names in an array. Could anybody help with this question please?



best way to do deep learning on spark ?

2016-03-19 Thread charles li
Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
that MLlib does not support deep learning, I want to know is there any way
to implement deep learning on spark ?

*Do I must use 3-party package like caffe or tensorflow ?*

or

*Does deep learning module list in the MLlib development plan?*


great thanks

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-19 Thread Cody Koeninger
Is that happening only at startup, or during processing?  If that's
happening during normal operation of the stream, you don't have enough
resources to process the stream in time.

There's not a clean way to deal with that situation, because it's a
violation of preconditions.  If you want to modify the code to do what
makes sense for you, start looking at handleFetchErr in KafkaRDD.scala
  Recompiling that package isn't a big deal, because it's not a part
of the core spark deployment, so you'll only have to change your job,
not the deployed version of spark.



On Fri, Mar 18, 2016 at 6:16 AM, Ramkumar Venkataraman
 wrote:
> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
>
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
>
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
>
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
>
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetOutOfRangeException-tp26534.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



unix_timestamp() time zone problem

2016-03-19 Thread Andy Davidson
I am using python spark 1.6 and the --packages
datastax:spark-cassandra-connector:1.6.0-M1-s_2.10

I need to convert a time stamp string into a unix epoch time stamp. The
function unix_timestamp() function assume current time zone. How ever my
string data is UTC and encodes the time zone as zero. I have not been able
to find a way to get the unix time calculated correctly. simpleDateFormat
does not have good time zone support. Any suggestions?

I could write a UDF and to adjust for time zones how ever this seems like  a
hack

I tried using to_utc_timestamp(created, 'gmt¹) how ever this creates a
timestamp. I have not been able to figure out how to convert this to a unix
time sample I.e a long representing epoch

Any suggestions?

stmnt = "select \
row_key, created, count, unix_timestamp(created) as
unixTimeStamp, \
unix_timestamp(created, '-MM-dd HH:mm:ss.z') as etc \
 from \
rawTable \
 where \
 (created > '{0}') and (created <= '{1}') \
 and \
 (row_key = Œblue' \
or row_key = Œred' \
)".format('2016-03-12 00:30:00+', '2016-03-12
04:30:00+¹)


Sample out put

root
 |-- row_key: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- count: long (nullable = true)
 |-- unixTimeStamp: long (nullable = true)
 |-- etc: long (nullable = true)

2016-03-12 00:30:30.0 should be 1457742630 not 1457771430

+-+-+-+-+--+
|row_key  |created|count|unixTimeStamp|utc|
+-+-+-+-+--+
|red|2016-03-12 00:30:30.0|2|1457771430   |1457771430|
|red|2016-03-12 00:30:45.0|1|1457771445   |1457771445|


static Column 
 unix_timestamp 
 (Column
  s)Converts time string in format -MM-dd HH:mm:ss to Unix timestamp
(in seconds), using the default timezone and the default locale, return null
if fail.
static Column 
 unix_timestamp 
 (Column
  s, java.lang.String p)Convert time string with given pattern (see
[http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html])
to Unix time stamp (in seconds), return null if fail.




[discuss] making SparkEnv private in Spark 2.0

2016-03-19 Thread Reynold Xin
Any objections? Please articulate your use case. SparkEnv is a weird one
because it was documented as "private" but not marked as so in class
visibility.

 * NOTE: This is not intended for external use. This is exposed for Shark
and may be made private
 *   in a future release.


I do see Hive using it to get the config variable. That can probably be
propagated through other means.


Re: Unit testing framework for Spark Jobs?

2016-03-19 Thread Lars Albertsson
I would recommend against writing unit tests for Spark programs, and
instead focus on integration tests of jobs or pipelines of several
jobs. You can still use a unit test framework to execute them. Perhaps
this is what you meant.

You can use any of the popular unit test frameworks to drive your
tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
gives you choice of TDD vs BDD, and it is also well integrated with
IntelliJ.

I would also recommend against using testing frameworks tied to a
processing technology, such as Spark Testing Base. Although it does
seem well crafted, and makes it easy to get started with testing,
there are drawbacks:

1. I/O routines are not tested. Bundled test frameworks typically do
not materialise datasets on storage, but pass them directly in memory.
(I have not verified this for Spark Testing Base, but it looks so.)
I/O routines are therefore not exercised, and they often hide bugs,
e.g. related to serialisation.

2. You create a strong coupling between processing technology and your
tests. If you decide to change processing technology (which can happen
soon in this fast paced world...), you need to rewrite your tests.
Therefore, during a migration process, the tests cannot detect bugs
introduced in migration, and help you migrate fast.

I recommend that you instead materialise input datasets on local disk,
run your Spark job, which writes output datasets to local disk, read
output from disk, and verify the results. You can still use Spark
routines to read and write input and output datasets. A Spark context
is expensive to create, so for speed, I would recommend reusing the
Spark context between input generation, running the job, and reading
output.

This is easy to set up, so you don't need a dedicated framework for
it. Just put your common boilerplate in a shared test trait or base
class.

In the future, when you want to replace your Spark job with something
shinier, you can still use the old tests, and only replace the part
that runs your job, giving you some protection from regression bugs.


Testing Spark Streaming applications is a different beast, and you can
probably not reuse much from your batch testing.

For testing streaming applications, I recommend that you run your
application inside a unit test framework, e.g, Scalatest, and have the
test setup create a fixture that includes your input and output
components. For example, if your streaming application consumes from
Kafka and updates tables in Cassandra, spin up single node instances
of Kafka and Cassandra on your local machine, and connect your
application to them. Then feed input to a Kafka topic, and wait for
the result to appear in Cassandra.

With this setup, your application still runs in Scalatest, the tests
run without custom setup in maven/sbt/gradle, and you can easily run
and debug inside IntelliJ.

Docker is suitable for spinning up external components. If you use
Kafka, the Docker image spotify/kafka is useful, since it bundles
Zookeeper.

When waiting for output to appear, don't sleep for a long time and
then check, since it will slow down your tests. Instead enter a loop
where you poll for the results and sleep for a few milliseconds in
between, with a long timeout (~30s) before the test fails with a
timeout.

This poll and sleep strategy both makes tests quick in successful
cases, but still robust to occasional delays. The strategy does not
work if you want to test for absence, e.g. ensure that a particular
message if filtered. You can work around it by adding another message
afterwards and polling for its effect before testing for absence of
the first. Be aware that messages can be processed out of order in
Spark Streaming depending on partitioning, however.


I have tested Spark applications with both strategies described above,
and it is straightforward to set up. Let me know if you want
clarifications or assistance.

Regards,



Lars Albertsson
Data engineering consultant
www.mapflat.com
+46 70 7687109


On Wed, Mar 2, 2016 at 6:54 PM, SRK  wrote:
> Hi,
>
> What is a good unit testing framework for Spark batch/streaming jobs? I have
> core spark, spark sql with dataframes and streaming api getting used. Any
> good framework to cover unit tests for these APIs?
>
> Thanks!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Subquery performance

2016-03-19 Thread Younes Naguib
Hi all,

I'm running a query that looks like the following:
Select col1, count(1)
>From (Select col2, count(1) from tab2 group by col2)
Inner join tab1 on (col1=col2)
Group by col1

This creates a very large shuffle, 10 times the data size, as if the subquery 
was executed for each row.
Anything can be done to tune to help tune this?
When the subquery in persisted, it runs much faster, and the shuffle is 50 
times smaller!

Thanks,
Younes


Re: installing packages with pyspark

2016-03-19 Thread Franc Carter
I'm having trouble with that for pyspark, yarn and graphframes. I'm using:-

pyspark --master yarn --packages graphframes:graphframes:0.1.0-spark1.5

which starts and gives me a REPL, but when I try

   from graphframes import *

I get

  No module names graphframes

without '--master yarn' it works as expected

thanks


On 18 March 2016 at 12:59, Felix Cheung  wrote:

> For some, like graphframes that are Spark packages, you could also use
> --packages in the command line of spark-submit or pyspark. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> _
> From: Jakob Odersky 
> Sent: Thursday, March 17, 2016 6:40 PM
> Subject: Re: installing packages with pyspark
> To: Ajinkya Kale 
> Cc: 
>
>
>
> Hi,
> regarding 1, packages are resolved locally. That means that when you
> specify a package, spark-submit will resolve the dependencies and
> download any jars on the local machine, before shipping* them to the
> cluster. So, without a priori knowledge of dataproc clusters, it
> should be no different to specify packages.
>
> Unfortunatly I can't help with 2.
>
> --Jakob
>
> *shipping in this case means making them available via the network
>
> On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale 
> wrote:
> > Hi all,
> >
> > I had couple of questions.
> > 1. Is there documentation on how to add the graphframes or any other
> package
> > for that matter on the google dataproc managed spark clusters ?
> >
> > 2. Is there a way to add a package to an existing pyspark context
> through a
> > jupyter notebook ?
> >
> > --aj
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


-- 
Franc


Re: How to add an accumulator for a Set in Spark

2016-03-19 Thread swetha kasireddy
OK. I did take a look at them. So once I have an accumulater for a HashSet,
how can I check if a particular key is already present in the HashSet
accumulator? I don't see any .contains method there. My requirement is that
I need to keep accumulating the keys in the HashSet across all the tasks in
various nodes and use it to do a check if the key is already present in the
HashSet.

On Tue, Mar 15, 2016 at 9:56 PM, pppsunil  wrote:

> Have you looked at using Accumulable interface,  Take a look at Spark
> documentation at
> http://spark.apache.org/docs/latest/programming-guide.html#accumulators it
> gives example of how to use vector type for accumalator, which might be
> very
> close to what you need
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-an-accumulator-for-a-Set-in-Spark-tp26510p26514.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Checkpoint of DStream joined with RDD

2016-03-19 Thread Lubomir Nerad

Hi,

I tried to replicate the example of joining DStream with lookup RDD from 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation. 
It works fine, but when I enable checkpointing for the StreamingContext 
and let the application to recover from a previously created checkpoint, 
I always get an exception during start and the whole application fails. 
I tried various types of lookup RDD, but the result is the same.


Exception in the case of HBase RDD is:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:109)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at org.apache.spark.Partitioner$$anonfun$2.apply(Partitioner.scala:58)
at scala.math.Ordering$$anon$5.compare(Ordering.scala:122)
at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
at java.util.TimSort.sort(TimSort.java:216)
at java.util.Arrays.sort(Arrays.java:1438)
at scala.collection.SeqLike$class.sorted(SeqLike.scala:615)
at scala.collection.AbstractSeq.sorted(Seq.scala:40)
at scala.collection.SeqLike$class.sortBy(SeqLike.scala:594)
at scala.collection.AbstractSeq.sortBy(Seq.scala:40)
at 
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$join$2.apply(PairRDDFunctions.scala:651)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at 
org.apache.spark.rdd.PairRDDFunctions.join(PairRDDFunctions.scala:650)

at org.apache.spark.api.java.JavaPairRDD.join(JavaPairRDD.scala:546)

I tried Spark 1.5.2 and 1.6.0 without success. The problem seems to be 
that RDDs use some transient fields which are not restored when they are 
recovered from checkpoint files. In case of some RDD implementations it 
is SparkContext, but it can be also implementation specific 
Configuration object, etc. I see in the sources that in the case of 
DStream recovery, the DStreamGraph takes care of restoring 
StreamingContext in all its DStream-s. But I haven't found any similar 
mechanism for RDDs.


So my question is whether I am doing something wrong or this is a bug in 
Spark? If later, is there some workaround except for implementing a 
custom DStream which will return the same RDD every batch interval and 
joining at DStream level instead of RDD level in transform?


I apologize if this has been discussed in the past and I missed it when 
looking into archive.


Thanks,
Lubo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Error] : dynamically union All + adding new column

2016-03-19 Thread Ted Yu
It turned out that Col1 appeared twice in the select :-)

> On Mar 16, 2016, at 7:29 PM, Divya Gehlot  wrote:
> 
> Hi,
> I am dynamically doing union all and adding new column too 
> 
>> val dfresult = 
>> dfAcStamp.select("Col1","Col1","Col3","Col4","Col5","Col6","col7","col8","col9")
>> val schemaL = dfresult.schema 
>> var dffiltered = sqlContext.createDataFrame(sc.emptyRDD[Row], schemaL)
>> for ((key,values) <- lcrMap) {
>> if(values(4) != null){
>>  println("Condition="+values(4))
>>  val renameRepId = values(0)+"REP_ID"
>>  dffiltered.printSchema
>> dfresult.printSchema
>>  dffiltered = 
>> dffiltered.unionAll(dfresult.withColumn(renameRepId,lit(values(3))).drop("Col9").select("Col1","Col1","Col3","Col4","Col5","Col6","Col7","Col8","Col9").where(values(4))).distinct()
>>  
>> }
>> }
> 
> 
> when I am printing the schema 
> dfresult 
> root
>  |-- Col1: date (nullable = true)
>  |-- Col2: date (nullable = true)
>  |-- Col3: string (nullable = false)
>  |-- Col4: string (nullable = false)
>  |-- Col5: string (nullable = false)
>  |-- Col6: string (nullable = true)
>  |-- Col7: string (nullable = true)
>  |-- Col8: string (nullable = true)
>  |-- Col9: null (nullable = true)
> 
> 
> dffiltered Schema
> root
>  |-- Col1: date (nullable = true)
>  |-- Col2: date (nullable = true)
>  |-- Col3: string (nullable = false)
>  |-- Col4: string (nullable = false)
>  |-- Col5: string (nullable = false)
>  |-- Col6: string (nullable = true)
>  |-- Col7: string (nullable = true)
>  |-- Col8: string (nullable = true)
>  |-- Col9: null (nullable = true)
> 
> 
> As It is priting the same schema but when I am doing UnionAll its giving me 
> below error 
> org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
> with the same number of columns, but the left table has 9 columns and the 
> right has 8;
> 
> Could somebody help me in pointing out my mistake  .
> 
> 
> Thanks,
> 
> 


Re: Get Pair of Topic and Message from Kafka + Spark Streaming

2016-03-19 Thread Cody Koeninger
There's 1 topic per partition, so you're probably better off dealing
with topics that way rather than at the individual message level.

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

Look at the discussion of "HasOffsetRanges"

If you really want to attach a topic to each message, look at the
constructor that allows you to pass a messageHandler argument.  That
gives you per-item access to everything in message and metadata,
including the topic.

On Wed, Mar 16, 2016 at 3:37 AM, Imre Nagi  wrote:
> Hi,
>
> I'm just trying to process the data that come from the kafka source in my
> spark streaming application. What I want to do is get the pair of topic and
> message in a tuple from the message stream.
>
> Here is my streams:
>
>>  val streams = KafkaUtils.createDirectStream[String, Array[Byte],
>> StringDecoder, DefaultDecoder](ssc,kafkaParameter,
>>   Array["topic1", "topic2])
>
>
> I have done several things, but still failed when i did some transformations
> from the streams to the pair of topic and message. I hope somebody can help
> me here.
>
> Thanks,
> Imre

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: installing packages with pyspark

2016-03-19 Thread Jakob Odersky
> But I guess I cannot add a package once i launch the pyspark context right ?

Correct. Potentially, if you really really wanted to, you could maybe
(with lots of pain) load packages dynamically with some class-loader
black magic, but Spark does not provide that functionality.

On Thu, Mar 17, 2016 at 7:20 PM, Ajinkya Kale  wrote:
> Thanks Jakob, Felix. I am aware you can do it with --packages but i was
> wondering if there is a way to do something like "!pip install "
> like i do for other packages from jupyter notebook for python. But I guess I
> cannot add a package once i launch the pyspark context right ?
>
> On Thu, Mar 17, 2016 at 6:59 PM Felix Cheung 
> wrote:
>>
>> For some, like graphframes that are Spark packages, you could also use
>> --packages in the command line of spark-submit or pyspark. See
>> http://spark.apache.org/docs/latest/submitting-applications.html
>>
>> _
>> From: Jakob Odersky 
>> Sent: Thursday, March 17, 2016 6:40 PM
>> Subject: Re: installing packages with pyspark
>> To: Ajinkya Kale 
>> Cc: 
>>
>>
>> Hi,
>> regarding 1, packages are resolved locally. That means that when you
>> specify a package, spark-submit will resolve the dependencies and
>> download any jars on the local machine, before shipping* them to the
>> cluster. So, without a priori knowledge of dataproc clusters, it
>> should be no different to specify packages.
>>
>> Unfortunatly I can't help with 2.
>>
>> --Jakob
>>
>> *shipping in this case means making them available via the network
>>
>> On Thu, Mar 17, 2016 at 5:36 PM, Ajinkya Kale 
>> wrote:
>> > Hi all,
>> >
>> > I had couple of questions.
>> > 1. Is there documentation on how to add the graphframes or any other
>> > package
>> > for that matter on the google dataproc managed spark clusters ?
>> >
>> > 2. Is there a way to add a package to an existing pyspark context
>> > through a
>> > jupyter notebook ?
>> >
>> > --aj
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Rename Several Aggregated Columns

2016-03-19 Thread Sunitha Kambhampati

One way is to rename the columns using the toDF

For eg:

val df = Seq((1, 2),(1,4),(2,3) ).toDF("a","b")
df.printSchema()

val renamedf = df.groupBy('a).agg(sum('b)).toDF("mycola", "mycolb")
renamedf.printSchema()
Best regards,
Sunitha

> On Mar 18, 2016, at 9:10 AM, andres.fernan...@wellsfargo.com wrote:
> 
> Good morning. I have a dataframe and would like to group by on two fields, 
> and perform a sum aggregation on more than 500 fields, though I would like to 
> keep the same name for the 500 hundred fields (instead of sum(Field)). I do 
> have the field names in an array. Could anybody help with this question 
> please?
> 



Re: HP customer support @ www.globalpccure.com/Support/Support-for-HP.aspx

2016-03-19 Thread nsalian
Please refrain from posting such messages on this email thread.
This is specific to the Spark ecosystem and not an avenue to advertise an
entity/company.

Thank you.



-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HP-customer-support-www-globalpccure-com-Support-Support-for-HP-aspx-tp26521p26522.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ClassNotFoundException in RDD.map

2016-03-19 Thread Ted Yu
bq. $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1

Do you mind showing more of your code involving the map() ?

On Thu, Mar 17, 2016 at 8:32 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hello,
> I found a strange behavior after executing a prediction with MLIB.
> My code return an RDD[(Any,Double)] where Any is the id of my dataset,
> which is BigDecimal, and Double is the prediction for that line.
> When I run
> myRdd.take(10) it returns ok
> res16: Array[_ >: (Double, Double) <: (Any, Double)] =
> Array((1921821857196754403.00,0.1690292052496703),
> (454575632374427.00,0.16902820241892452),
> (989198096568001939.00,0.16903432789699502),
> (14284129652106187990.00,0.16903517653451386),
> (17980228074225252497.00,0.16903151028332508),
> (3861345958263692781.00,0.16903056986183976),
> (17558198701997383205.00,0.1690295450319745),
> (10651576092054552310.00,0.1690286445174418),
> (4534494349035056215.00,0.16903303401862327),
> (5551671513234217935.00,0.16902303368995966))
> But when I try to run some map on it:
> myRdd.map(_._1).take(10)
> It throws a ClassCastException:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 72.0 (TID 1774, 172.31.23.208): java.lang.ClassNotFoundException:
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
> 

[Error] : dynamically union All + adding new column

2016-03-19 Thread Divya Gehlot
Hi,
I am dynamically doing union all and adding new column too

val dfresult =
> dfAcStamp.select("Col1","Col1","Col3","Col4","Col5","Col6","col7","col8","col9")
> val schemaL = dfresult.schema
> var dffiltered = sqlContext.createDataFrame(sc.emptyRDD[Row], schemaL)
> for ((key,values) <- lcrMap) {
> if(values(4) != null){
>  println("Condition="+values(4))
>  val renameRepId = values(0)+"REP_ID"
>  dffiltered.printSchema
> dfresult.printSchema
>  dffiltered =
> dffiltered.unionAll(dfresult.withColumn(renameRepId,lit(values(3))).drop("Col9").select("Col1","Col1","Col3","Col4","Col5","Col6","Col7","Col8","Col9").where(values(4))).distinct()


> }
> }



when I am printing the schema
dfresult
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


dffiltered Schema
root
 |-- Col1: date (nullable = true)
 |-- Col2: date (nullable = true)
 |-- Col3: string (nullable = false)
 |-- Col4: string (nullable = false)
 |-- Col5: string (nullable = false)
 |-- Col6: string (nullable = true)
 |-- Col7: string (nullable = true)
 |-- Col8: string (nullable = true)
 |-- Col9: null (nullable = true)


As It is priting the same schema but when I am doing UnionAll its giving me
below error
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the same number of columns, but the left table has 9 columns
and the right has 8;

Could somebody help me in pointing out my mistake  .


Thanks,


  1   2   >