Re: slow SQL query with cached dataset

2016-04-25 Thread Jörn Franke
I do not know your data, but it looks that you have too many partitions for 
such a small data set.

> On 26 Apr 2016, at 00:47, Imran Akbar  wrote:
> 
> Hi,
> 
> I'm running a simple query like this through Spark SQL:
> 
> sqlContext.sql("SELECT MIN(age) FROM data WHERE country = 'GBR' AND 
> dt_year=2015 AND dt_month BETWEEN 1 AND 11 AND product IN ('cereal')").show()
> 
> which takes 3 minutes to run against an in-memory cache of 9 GB of data.
> 
> The data was 100% cached in memory before I ran the query (see screenshot 1).
> The data was cached like this:
> data = sqlContext.sql("SELECT * FROM raw WHERE (dt_year=2015 OR 
> dt_year=2016)")
> data.cache()
> data.registerTempTable("data")
> and then I ran an action query to load the data into the cache.
> 
> I see lots of rows of logs like this:
> 16/04/25 22:39:11 INFO MemoryStore: Block rdd_13136_2856 stored as values in 
> memory (estimated size 2.5 MB, free 9.7 GB)
> 16/04/25 22:39:11 INFO BlockManager: Found block rdd_13136_2856 locally
> 16/04/25 22:39:11 INFO MemoryStore: 6 blocks selected for dropping
> 16/04/25 22:39:11 INFO BlockManager: Dropping block rdd_13136_3866 from memory
> 
> Screenshot 2 shows the job page of the longest job.
> 
> The data was partitioned in Parquet by month, country, and product before I 
> cached it.
> 
> Any ideas what the issue could be?  This is running on localhost.
> 
> regards,
> imran
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reduceByKey as Action or Transformation

2016-04-25 Thread Sumedh Wale

  
  
On Monday 25 April 2016 11:28 PM,
  Weiping Qu wrote:


  
  Dear Ted,
  
  You are right. ReduceByKey is transformation. My fault.
  I would rephrase my question using following code snippet.
  object ScalaApp {
  
    def main(args: Array[String]): Unit ={
      val conf = new
  SparkConf().setAppName("ScalaApp").setMaster("local")
      val sc = new SparkContext(conf)
      //val textFile: RDD[String] =
      val file = sc.textFile("/home/usr/test.dat")
      val output = file.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey(_ + _)
  
      output.persist()
      output.count()
      output.collect()
  }
  
  It's a simple code snippet. 
  I realize that the first action count() would trigger the
  execution based on HadoopRDD, MapParititonRDD and the reduceByKey
  will take the ShuffleRDD as input to perform the count.


The count() will trigger both the execution as well as the
persistence of output RDD (as each partition is iterated).

 The second action collect just perform the collect
  over the same ShuffleRDD.


It will use the persisted ShuffleRDD blocks.

 I think the re-calculation will also be carried out
  over ShuffleRDD instead of re-executing preceding HadoopRDD and
  MapParitionRDD in case one partition of persisted output is
  missing.
  Am I right?


Since it is a partition of persisted ShuffleRDD that is missing, the
partition will have to be recreated from the base HadoopRDD. To
avoid it, one can checkpoint the ShuffleRDD if required.

 
  Thanks and Regards,
  Weiping
  
  
  
  


regards
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cant join same dataframe twice ?

2016-04-25 Thread Ted Yu
Can you show us the structure of df2 and df3 ?

Thanks

On Mon, Apr 25, 2016 at 8:23 PM, Divya Gehlot 
wrote:

> Hi,
> I am using Spark 1.5.2 .
> I have a use case where I need to join the same dataframe twice on two
> different columns.
> I am getting error missing Columns
>
> For instance ,
> val df1 = df2.join(df3,"Column1")
> Below throwing error missing columns
> val df 4 = df1.join(df3,"Column2")
>
> Is the bug or valid scenario ?
>
>
>
>
> Thanks,
> Divya
>


Cant join same dataframe twice ?

2016-04-25 Thread Divya Gehlot
Hi,
I am using Spark 1.5.2 .
I have a use case where I need to join the same dataframe twice on two
different columns.
I am getting error missing Columns

For instance ,
val df1 = df2.join(df3,"Column1")
Below throwing error missing columns
val df 4 = df1.join(df3,"Column2")

Is the bug or valid scenario ?




Thanks,
Divya


Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-25 Thread Praveen Devarao
Cool!! Thanks for the clarification Mike.

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Michael Armbrust 
To: Praveen Devarao/India/IBM@IBMIN
Cc: Reynold Xin , "d...@spark.apache.org" 
, user 
Date:   25/04/2016 10:59 pm
Subject:Re: Do transformation functions on RDD invoke a Job 
[sc.runJob]?



Spark SQL's query planner has always delayed building the RDD, so has 
never needed to eagerly calculate the range boundaries (since Spark 1.0).

On Mon, Apr 25, 2016 at 2:04 AM, Praveen Devarao  
wrote:
Thanks Reynold for the reason as to why sortBykey invokes a Job

When you say "DataFrame/Dataset does not have this issue" is it right to 
assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in 
it?

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:Reynold Xin 
To:Praveen Devarao/India/IBM@IBMIN
Cc:"d...@spark.apache.org" , user <
user@spark.apache.org>
Date:25/04/2016 11:26 am
Subject:Re: Do transformation functions on RDD invoke a Job 
[sc.runJob]?




Usually no - but sortByKey does because it needs the range boundary to be 
built in order to have the RDD. It is a long standing problem that's 
unfortunately very difficult to solve without breaking the RDD API.

In DataFrame/Dataset we don't have this issue though.


On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao  
wrote:
Hi,

I have a streaming program with the block as below [ref: 
https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
]

1 val lines = messages.map(_._2)
2 val hashTags = lines.flatMap(status => status.split(" "
).filter(_.startsWith("#")))

3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ )
3a .map { case (topic, count) => (count, topic) }
3b .transform(_.sortByKey(false))

4atopCounts60.foreachRDD( rdd => {
4b val topList = rdd.take( 10 )
})

This batch is triggering 2 jobs...one at line 3b(sortByKey) and 
the other at 4b (rdd.take) I agree that there is a Job triggered on line 
4b as take() is an action on RDD while as on line 3b sortByKey is just a 
transformation function which as per docs is lazy evaluation...but I see 
that this line uses a RangePartitioner and Rangepartitioner on 
initialization invokes a method called sketch() that invokes collect() 
triggering a Job.

My question: Is it expected that sortByKey will invoke a Job...if 
yes, why is sortByKey listed as a transformation and not action. Are there 
any other functions like this that invoke a Job, though they are 
transformations and not actions?

I am on Spark 1.6

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"









Issues with Long Running Streaming Application

2016-04-25 Thread Bryan Jeffrey
Hello.

I have a long running streaming application. It is consuming a large amount
of data from Kafka (on the order of 25K messages / second in two minute
batches.  The job reads the data, makes some decisions on what to save, and
writes the selected data into Cassandra.  The job is very stable - each
batch takes around 25 seconds to process, and every 30 minutes new training
data is read from Cassandra which increases batch time to around 1 minute.

The issue I'm seeing is that the job is stable for around 5-7 hours, after
which it takes an increasingly long time to compute each batch.  The
executor memory used (cached RDDs) remains around the same level, no
operation takes more than a single batch into account, the write time to
Cassandra does not vary significantly - everything just suddenly seems to
take a longer period of time to compute.

Initially I was seeing issues with instability in a shorter time horizon.
To address these issues I took the following steps:

1. Explicitly expired RDDs via 'unpersist' once they were no longer
required.
2. Turned on gen-1 GC via -XX:+UseG1GC
3. Enabled Kryo serialization
4. Removed several long-running aggregate operations (reduceByKeyAndWindow,
updateStateByKey) from this job.

The result is a job that appears completely stable for hours at a time.
The OS does not appear to have any odd tasks run, Cassandra
compaction/cleanup/repair is not responsible for the delay.   Has anyone
seen similar behavior?  Any thoughts?

Regards,

Bryan Jeffrey


Re: XML Data Source for Spark

2016-04-25 Thread Hyukjin Kwon
Hi Janan,

Sorry, I was sleeping. I guess you sent a email to me first and then ask it
to mailing list because I am not answering.

I just tested this to double-check and could produce the same exception
below:

java.lang.NoSuchMethodError:
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.databricks.spark.xml.XmlOptions.(XmlOptions.scala:26)
at com.databricks.spark.xml.XmlOptions$.apply(XmlOptions.scala:48)
at
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:58)
at
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)



Try to another Scala compiled one. There are two as below:

Scala 2.10

groupId: com.databricks
artifactId: spark-xml_2.10
version: 0.3.3

Scala 2.11

groupId: com.databricks
artifactId: spark-xml_2.11
version: 0.3.3



2016-04-26 6:57 GMT+09:00 Michael Armbrust :

> You are using a version of the library that was compiled for a different
> version of Scala than the version of Spark that you are using.  Make sure
> that they match up.
>
> On Mon, Apr 25, 2016 at 5:19 PM, Mohamed ismail <
> mismai...@yahoo.com.invalid> wrote:
>
>> here is an example with code.
>> http://stackoverflow.com/questions/33078221/xml-processing-in-spark
>>
>> I haven't tried.
>>
>>
>> On Monday, April 25, 2016 1:06 PM, Jinan Alhajjaj <
>> j.r.alhaj...@hotmail.com> wrote:
>>
>>
>> Hi All,
>> I am trying to use XML data source that is used for parsing and querying
>> XML data with Apache Spark, for Spark SQL and data frames.I am using Apache
>> spark version 1.6.1 and I am using Java as a programming language.
>> I wrote this sample code :
>> SparkConf conf = new SparkConf().setAppName("parsing").setMaster("local");
>>
>> JavaSparkContext sc = new JavaSparkContext(conf);
>>
>> SQLContext sqlContext = new SQLContext(sc);
>>
>> DataFrame df =
>> sqlContext.read().format("com.databricks.spark.xml").option("rowtag",
>> "page").load("file.xml");
>>
>> When I run this code I faced a problem which is
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
>> at com.databricks.spark.xml.XmlOptions.(XmlOptions.scala:26)
>> at com.databricks.spark.xml.XmlOptions$.apply(XmlOptions.scala:48)
>> at
>> com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:58)
>> at
>> com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44)
>> at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
>> at datbricxml.parsing.main(parsing.java:16).
>> Please, I need to solve this error for my senior project ASAP.
>>
>>
>>
>>
>


Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-25 Thread Mich Talebzadeh
thanks I sorted this out.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 15:20, Cody Koeninger  wrote:

> Show the full relevant code including imports.
>
> On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh
>  wrote:
> > Hi Cody,
> >
> > This is my first attempt on using offset ranges (this may not mean much
> in
> > my context at the moment)
> >
> > val ssc = new StreamingContext(conf, Seconds(10))
> > ssc.checkpoint("checkpoint")
> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092",
> > "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" ->
> > "rhes564:2181", "group.id" -> "StreamTest" )
> > val topics = Set("newtopic", "newtopic")
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topics)
> > dstream.cache()
> > val lines = dstream.map(_._2)
> > val showResults = lines.filter(_.contains("statement
> cache")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> > // Define the offset ranges to read in the batch job. Just one offset
> range
> > val offsetRanges = Array(
> >   OffsetRange("newtopic", 0, 110, 220)
> > )
> > // Create the RDD based on the offset ranges
> > val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> > StringDecoder](sc, kafkaParams, offsetRanges)
> >
> >
> > This comes back with error
> >
> > [info] Compiling 1 Scala source to
> > /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> > [error]
> >
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
> > not found: value OffsetRange
> > [error]   OffsetRange("newtopic", 0, 110, 220),
> > [error]   ^
> > [error] one error found
> > [error] (compile:compileIncremental) Compilation failed
> >
> > Any ideas will be appreciated
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 22 April 2016 at 22:04, Cody Koeninger  wrote:
> >>
> >> Spark streaming as it exists today is always microbatch.
> >>
> >> You can certainly filter messages using spark streaming.
> >>
> >>
> >> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
> >>  wrote:
> >> > yep actually using createDirectStream sounds a better way of doing it.
> >> > Am I
> >> > correct that createDirectStream was introduced to overcome
> >> > micro-batching
> >> > limitations?
> >> >
> >> > In a nutshell I want to pickup all the messages and keep signal
> >> > according to
> >> > pre-built criteria (say indicating a buy signal) and ignore the
> >> > pedestals
> >> >
> >> > Thanks
> >> >
> >> > Dr Mich Talebzadeh
> >> >
> >> >
> >> >
> >> > LinkedIn
> >> >
> >> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >
> >> >
> >> >
> >> > http://talebzadehmich.wordpress.com
> >> >
> >> >
> >> >
> >> >
> >> > On 22 April 2016 at 21:56, Cody Koeninger  wrote:
> >> >>
> >> >> You can still do sliding windows with createDirectStream, just do
> your
> >> >> map / extraction of fields before the window.
> >> >>
> >> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
> >> >>  wrote:
> >> >> > Hi Cody,
> >> >> >
> >> >> > I want to use sliding windows for Complex Event Processing
> >> >> > micro-batching
> >> >> >
> >> >> > Dr Mich Talebzadeh
> >> >> >
> >> >> >
> >> >> >
> >> >> > LinkedIn
> >> >> >
> >> >> >
> >> >> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >> >
> >> >> >
> >> >> >
> >> >> > http://talebzadehmich.wordpress.com
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > On 22 April 2016 at 21:51, Cody Koeninger 
> wrote:
> >> >> >>
> >> >> >> Why are you wanting to convert?
> >> >> >>
> >> >> >> As far as doing the conversion, createStream doesn't take the same
> >> >> >> arguments, look at the docs.
> >> >> >>
> >> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
> >> >> >>  wrote:
> >> >> >> > Hi,
> >> >> >> >
> >> >> >> > What is the best way of converting this program of that uses
> >> >> >> > KafkaUtils.createDirectStream to Sliding window using
> >> >> >> >
> >> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> >
> >> >> >> > to
> >> >> >> >
> >> >> >> > val dstream = KafkaUtils.createStream[String, String,
> >> >> >> > StringDecoder,
> >> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >> >
> >> >> >> >
> >> >> >> > The program below 

Re: slow SQL query with cached dataset

2016-04-25 Thread Mich Talebzadeh
Are you sure it is not spilling to disk?

How many rows are cached in your result set -> sqlContext.sql("SELECT *
FROM raw WHERE (dt_year=2015 OR dt_year=2016)")

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 23:47, Imran Akbar  wrote:

> Hi,
>
> I'm running a simple query like this through Spark SQL:
>
> sqlContext.sql("SELECT MIN(age) FROM data WHERE country = 'GBR' AND
> dt_year=2015 AND dt_month BETWEEN 1 AND 11 AND product IN
> ('cereal')").show()
>
> which takes 3 minutes to run against an in-memory cache of 9 GB of data.
>
> The data was 100% cached in memory before I ran the query (see screenshot
> 1).
> The data was cached like this:
> data = sqlContext.sql("SELECT * FROM raw WHERE (dt_year=2015 OR
> dt_year=2016)")
> data.cache()
> data.registerTempTable("data")
> and then I ran an action query to load the data into the cache.
>
> I see lots of rows of logs like this:
> 16/04/25 22:39:11 INFO MemoryStore: Block rdd_13136_2856 stored as values
> in memory (estimated size 2.5 MB, free 9.7 GB)
> 16/04/25 22:39:11 INFO BlockManager: Found block rdd_13136_2856 locally
> 16/04/25 22:39:11 INFO MemoryStore: 6 blocks selected for dropping
> 16/04/25 22:39:11 INFO BlockManager: Dropping block rdd_13136_3866 from
> memory
>
> Screenshot 2 shows the job page of the longest job.
>
> The data was partitioned in Parquet by month, country, and product before
> I cached it.
>
> Any ideas what the issue could be?  This is running on localhost.
>
> regards,
> imran
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Hive Metastore issues with Spark 1.6.1

2016-04-25 Thread Mich Talebzadeh
Hi,


How is it doing that, running the script against your metastore?
HiveContext is native to Hive code that allows Spark to use Hive SQL
dialect.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 22:03, Pradeepkumar Konda <
pradeepkumar.ko...@getbase.com> wrote:

> I have installed Spark 1.6.1 and trying to connect to a Hive metastore
> 0.14.0 version.
> This was working fine on Spark 1.4.1. I am pointing to same meta store
> from 1.6.1 and then getting  connectivity issues.
>
> I read over some online threads and added below 2 lines to default spark
> conf xml
>
> *spark.sql.hive.metastore.version 0.14.0*
> *spark.sql.hive.metastore.jars maven*
>
> then I get this error -
>
> "pyspark.sql.utils.IllegalArgumentException: u'Builtin jars can only be
> used when hive execution version == hive metastore version. Execution:
> 1.2.1 != Metastore: 0.14.0. Specify a vaild path to the correct hive jars
> using $HIVE_METASTORE_JARS or change spark.sql.hive.metastore.version to
> 1.2.1.'"
>
> Without these lines I get the below error
>
> "Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke
> the "BONECP" plugin to create a ConnectionPool gave an error : The
> specified datastore driver ("org.mariadb.jdbc.Driver") was not found in the
> CLASSPATH. Please check your CLASSPATH specification, and the name of the
> driver."
>
> Then I commented added new 2 lines i.e. meta store version, jars
> from spark-defaults.conf and ran pyspark command with additional jars as -
>
> *pyspark --jars /usr/lib/hive/lib/mariadb-connector-java.jar*
>
> After pyspark, I am trying to work with Hive Context -
> *Now this is updating my Hive Metastore to 1.2.0  (which should be 0.14.0)*
>
> It works fine as expected now, connecting to Hive meta store, notebooks
> etc. but my big concern is why running PySpark Hive Context is updating Hive
> Metastore version?
>
> Thanks!
>


Re: XML Data Source for Spark

2016-04-25 Thread Michael Armbrust
You are using a version of the library that was compiled for a different
version of Scala than the version of Spark that you are using.  Make sure
that they match up.

On Mon, Apr 25, 2016 at 5:19 PM, Mohamed ismail  wrote:

> here is an example with code.
> http://stackoverflow.com/questions/33078221/xml-processing-in-spark
>
> I haven't tried.
>
>
> On Monday, April 25, 2016 1:06 PM, Jinan Alhajjaj <
> j.r.alhaj...@hotmail.com> wrote:
>
>
> Hi All,
> I am trying to use XML data source that is used for parsing and querying
> XML data with Apache Spark, for Spark SQL and data frames.I am using Apache
> spark version 1.6.1 and I am using Java as a programming language.
> I wrote this sample code :
> SparkConf conf = new SparkConf().setAppName("parsing").setMaster("local");
>
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> SQLContext sqlContext = new SQLContext(sc);
>
> DataFrame df =
> sqlContext.read().format("com.databricks.spark.xml").option("rowtag",
> "page").load("file.xml");
>
> When I run this code I faced a problem which is
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
> at com.databricks.spark.xml.XmlOptions.(XmlOptions.scala:26)
> at com.databricks.spark.xml.XmlOptions$.apply(XmlOptions.scala:48)
> at
> com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:58)
> at
> com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
> at datbricxml.parsing.main(parsing.java:16).
> Please, I need to solve this error for my senior project ASAP.
>
>
>
>


Re: XML Data Source for Spark

2016-04-25 Thread Mohamed ismail
here is an example with code. 
http://stackoverflow.com/questions/33078221/xml-processing-in-spark
I haven't tried. 

On Monday, April 25, 2016 1:06 PM, Jinan Alhajjaj 
 wrote:
 

 Hi All,I am 
trying to use XML data source that is used for parsing and querying XML data 
with Apache Spark, for Spark SQL and data frames.I am using Apache spark 
version 1.6.1 and I am using Java as a programming language. I wrote this 
sample code :SparkConf conf = new 
SparkConf().setAppName("parsing").setMaster("local"); 
 JavaSparkContext sc = new JavaSparkContext(conf);
 SQLContext sqlContext = new SQLContext(sc);
 DataFrame df = 
sqlContext.read().format("com.databricks.spark.xml").option("rowtag", 
"page").load("file.xml");
When I run this code I faced a problem which is Exception in thread "main" 
java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at 
com.databricks.spark.xml.XmlOptions.(XmlOptions.scala:26) at 
com.databricks.spark.xml.XmlOptions$.apply(XmlOptions.scala:48) at 
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:58) 
at 
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44) 
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109) at 
datbricxml.parsing.main(parsing.java:16).Please, I need to solve this error for 
my senior project ASAP.
  

  

Hive Metastore issues with Spark 1.6.1

2016-04-25 Thread Pradeepkumar Konda
I have installed Spark 1.6.1 and trying to connect to a Hive metastore
0.14.0 version.
This was working fine on Spark 1.4.1. I am pointing to same meta store from
1.6.1 and then getting  connectivity issues.

I read over some online threads and added below 2 lines to default spark
conf xml

*spark.sql.hive.metastore.version 0.14.0*
*spark.sql.hive.metastore.jars maven*

then I get this error -

"pyspark.sql.utils.IllegalArgumentException: u'Builtin jars can only be
used when hive execution version == hive metastore version. Execution:
1.2.1 != Metastore: 0.14.0. Specify a vaild path to the correct hive jars
using $HIVE_METASTORE_JARS or change spark.sql.hive.metastore.version to
1.2.1.'"

Without these lines I get the below error

"Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke
the "BONECP" plugin to create a ConnectionPool gave an error : The
specified datastore driver ("org.mariadb.jdbc.Driver") was not found in the
CLASSPATH. Please check your CLASSPATH specification, and the name of the
driver."

Then I commented added new 2 lines i.e. meta store version, jars
from spark-defaults.conf and ran pyspark command with additional jars as -

*pyspark --jars /usr/lib/hive/lib/mariadb-connector-java.jar*

After pyspark, I am trying to work with Hive Context -
*Now this is updating my Hive Metastore to 1.2.0  (which should be 0.14.0)*

It works fine as expected now, connecting to Hive meta store, notebooks
etc. but my big concern is why running PySpark Hive Context is updating Hive
Metastore version?

Thanks!


XML Data Source for Spark

2016-04-25 Thread Jinan Alhajjaj
Hi All,I am trying to use XML data source that is used for parsing and querying 
XML data with Apache Spark, for Spark SQL and data frames.I am using Apache 
spark version 1.6.1 and I am using Java as a programming language. I wrote this 
sample code :SparkConf conf = new 
SparkConf().setAppName("parsing").setMaster("local"); 
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = 
sqlContext.read().format("com.databricks.spark.xml").option("rowtag", 
"page").load("file.xml");
When I run this code I faced a problem which is Exception in thread "main" 
java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;at 
com.databricks.spark.xml.XmlOptions.(XmlOptions.scala:26)  at 
com.databricks.spark.xml.XmlOptions$.apply(XmlOptions.scala:48)  at 
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:58)   
 at 
com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:44)   
 at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109) at 
datbricxml.parsing.main(parsing.java:16).Please, I need to solve this error for 
my senior project ASAP.
  

JoinWithCassandraTable over individual queries

2016-04-25 Thread vaibhavrtk
Hi
I have an RDD with elements as tuple ((key1,key2),value) where (key1,key2)
is the partitioning key in my Cassandra table
Now for each such  element I have to do a read from Cassandra table. My
Cassandra table and spark cluster are in different nodes and cant be
co-located.
Right now I am doing individual query using session.execute("...").* Should
I prefer joinWithCassandraTable over individual queries? Do I get some
performance benefit?*

As i understand joinWithCassandraTable is ultimately going to perform
queries for each partitioningKey(or primary key not sure).

Regards 
Vaibhav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JoinWithCassandraTable-over-individual-queries-tp26833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Call Spark package API from R

2016-04-25 Thread Jörn Franke
You can call any Java/scala library from R using the package rJava 

> On 25 Apr 2016, at 19:16, ankur.jain  wrote:
> 
> Hello Team,
> 
> Is there any way to call spark code (scala/python) from R?
> I want to use Cloudera spark-ts api with SparkR, if anyone had used that
> please let me know.
> 
> Thank
> Ankur
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Call-Spark-package-API-from-R-tp26831.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame group and agg

2016-04-25 Thread Andrés Ivaldi
Hello,
Anyone know if this is on purpose or its a bug?
in
https://github.com/apache/spark/blob/2f1d0320c97f064556fa1cf98d4e30d2ab2fe661/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
class

the def agg have many implemetations next two of them:
Line 136:
  def agg(aggExpr: (String, String), aggExprs: (String, String)*):
DataFrame = {
agg((aggExpr +: aggExprs).toMap)
  }

Line 155:
  def agg(exprs: Map[String, String]): DataFrame = {
toDF(exprs.map { case (colName, expr) =>
  strToExpr(expr)(df(colName).expr)
}.toSeq)
  }

So this allow me to do somthing like .agg( "col1"->"sum", "col2"->"max"  )

But If I want to apply two differents agg function to same column, as the
method 136 creates map then somtihg like "col"->"sum", "col"->"max" will
end as "col"->"max"


I think this signatur of def whould work

  def agg(exprs: Seq[String, String]): DataFrame = {
toDF(exprs.map { case (colName, expr) =>
  strToExpr(expr)(df(colName).expr)
}.toSeq)
  }

Regards.


Re: reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Dear Ted,

You are right. ReduceByKey is transformation. My fault.
I would rephrase my question using following code snippet.
object ScalaApp {

  def main(args: Array[String]): Unit ={
val conf = new SparkConf().setAppName("ScalaApp").setMaster("local")
val sc = new SparkContext(conf)
//val textFile: RDD[String] =
val file = sc.textFile("/home/usr/test.dat")
val output = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

output.persist()
output.count()
output.collect()
}

It's a simple code snippet.
I realize that the first action count() would trigger the execution 
based on HadoopRDD, MapParititonRDD and the reduceByKey will take the 
ShuffleRDD as input to perform the count.

The second action collect just perform the collect over the same ShuffleRDD.
I think the re-calculation will also be carried out over ShuffleRDD 
instead of re-executing preceding HadoopRDD and MapParitionRDD in case 
one partition of persisted output is missing.

Am I right?

Thanks and Regards,
Weiping

On 25.04.2016 17:46, Ted Yu wrote:

Can you show snippet of your code which demonstrates what you observed ?

Thansk

On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu > wrote:


Thanks.
I read that from the specification.
I thought the way people distinguish actions and transformations
depends on whether they are lazily executed or not.
As far as I saw from my codes, the reduceByKey will be executed
without any operations in the Action category.
Please correct me if I am wrong.

Thanks,
Regards,
Weiping

On 25.04.2016 17 :20, Chadha Pooja wrote:

Reduce By Key is a Transformation


http://spark.apache.org/docs/latest/programming-guide.html#transformations

Thanks

_

Pooja Chadha
Senior Architect
THE BOSTON CONSULTING GROUP
Mobile +1 617 794 3862 


_




-Original Message-
From: Weiping Qu [mailto:q...@informatik.uni-kl.de
]
Sent: Monday, April 25, 2016 11:05 AM
To: u...@spark.incubator.apache.org

Subject: reduceByKey as Action or Transformation

Hi,

I'd like just to verify that whether reduceByKey is
transformation or
actions.
As written in RDD papers, spark flow will not be triggered only if
actions are reached.
I tried and saw that the my flow will be executed once there is a
reduceByKey while it is categorized into transformations in
Spark 1.6.1
specification.

Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org



__
The Boston Consulting Group, Inc.
  This e-mail message may contain confidential and/or
privileged information.
If you are not an addressee or otherwise authorized to receive
this message,
you should not use, copy, disclose or take any action based on
this e-mail or
any information contained in the message. If you have received
this material
in error, please advise the sender immediately by reply e-mail
and delete this
message. Thank you.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







Re: Defining case class within main method throws "No TypeTag available for Accounts"

2016-04-25 Thread Mich Talebzadeh
BTW is this documented as it seems to be potential issue.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 18:47, Mich Talebzadeh 
wrote:

> cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 25 April 2016 at 18:35, Michael Armbrust 
> wrote:
>
>> When you define a class inside of a method, it implicitly has a pointer
>> to the outer scope of the method.  Spark doesn't have access to this scope,
>> so this makes it hard (impossible?) for us to construct new instances of
>> that class.
>>
>> So, define your classes that you plan to use with Spark at the top level.
>>
>> On Mon, Apr 25, 2016 at 9:36 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I notice buiding with sbt if I define my case class *outside of main
>>> method* like below it works
>>>
>>>
>>> case class Accounts( TransactionDate: String, TransactionType: String,
>>> Description: String, Value: Double, Balance: Double, AccountName: String,
>>> AccountNumber : String)
>>>
>>> object Import_nw_10124772 {
>>>   def main(args: Array[String]) {
>>>   val conf = new SparkConf().
>>>setAppName("Import_nw_10124772").
>>>setMaster("local[12]").
>>>set("spark.driver.allowMultipleContexts", "true").
>>>set("spark.hadoop.validateOutputSpecs", "false")
>>>   val sc = new SparkContext(conf)
>>>   // Create sqlContext based on HiveContext
>>>   val sqlContext = new HiveContext(sc)
>>>   import sqlContext.implicits._
>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   //
>>>   // Get a DF first based on Databricks CSV libraries ignore column
>>> heading because of column called "Type"
>>>   //
>>>   val df =
>>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header",
>>> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
>>>   //df.printSchema
>>>   //
>>>val a = df.filter(col("Date") > "").map(p =>
>>> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
>>>
>>>
>>> However, if I put that case class with the main method, it throws "No
>>> TypeTag available for Accounts" error
>>>
>>> Apparently when case class is defined inside of the method that it is
>>> being used, it is not fully defined at that point.
>>>
>>> Is this a bug within Spark?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


Re: Defining case class within main method throws "No TypeTag available for Accounts"

2016-04-25 Thread Mich Talebzadeh
cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 25 April 2016 at 18:35, Michael Armbrust  wrote:

> When you define a class inside of a method, it implicitly has a pointer to
> the outer scope of the method.  Spark doesn't have access to this scope, so
> this makes it hard (impossible?) for us to construct new instances of that
> class.
>
> So, define your classes that you plan to use with Spark at the top level.
>
> On Mon, Apr 25, 2016 at 9:36 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I notice buiding with sbt if I define my case class *outside of main
>> method* like below it works
>>
>>
>> case class Accounts( TransactionDate: String, TransactionType: String,
>> Description: String, Value: Double, Balance: Double, AccountName: String,
>> AccountNumber : String)
>>
>> object Import_nw_10124772 {
>>   def main(args: Array[String]) {
>>   val conf = new SparkConf().
>>setAppName("Import_nw_10124772").
>>setMaster("local[12]").
>>set("spark.driver.allowMultipleContexts", "true").
>>set("spark.hadoop.validateOutputSpecs", "false")
>>   val sc = new SparkContext(conf)
>>   // Create sqlContext based on HiveContext
>>   val sqlContext = new HiveContext(sc)
>>   import sqlContext.implicits._
>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   println ("\nStarted at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>>   //
>>   // Get a DF first based on Databricks CSV libraries ignore column
>> heading because of column called "Type"
>>   //
>>   val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header",
>> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
>>   //df.printSchema
>>   //
>>val a = df.filter(col("Date") > "").map(p =>
>> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
>>
>>
>> However, if I put that case class with the main method, it throws "No
>> TypeTag available for Accounts" error
>>
>> Apparently when case class is defined inside of the method that it is
>> being used, it is not fully defined at that point.
>>
>> Is this a bug within Spark?
>>
>> Thanks
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: Defining case class within main method throws "No TypeTag available for Accounts"

2016-04-25 Thread Ashok Kumar
Thanks Michael as I gathered for now it is a feature. 

On Monday, 25 April 2016, 18:36, Michael Armbrust  
wrote:
 

 When you define a class inside of a method, it implicitly has a pointer to the 
outer scope of the method.  Spark doesn't have access to this scope, so this 
makes it hard (impossible?) for us to construct new instances of that class.
So, define your classes that you plan to use with Spark at the top level.
On Mon, Apr 25, 2016 at 9:36 AM, Mich Talebzadeh  
wrote:

Hi,
I notice buiding with sbt if I define my case class outside of main method like 
below it works

case class Accounts( TransactionDate: String, TransactionType: String, 
Description: String, Value: Double, Balance: Double, AccountName: String, 
AccountNumber : String)

object Import_nw_10124772 {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("Import_nw_10124772").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT 
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
").collect.foreach(println)
  //
  // Get a DF first based on Databricks CSV libraries ignore column heading 
because of column called "Type"
  //
  val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
  //df.printSchema
  //
   val a = df.filter(col("Date") > "").map(p => 
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))

However, if I put that case class with the main method, it throws "No TypeTag 
available for Accounts" error
Apparently when case class is defined inside of the method that it is being 
used, it is not fully defined at that point.
Is this a bug within Spark?
Thanks



Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com 



  

Re: Defining case class within main method throws "No TypeTag available for Accounts"

2016-04-25 Thread Michael Armbrust
When you define a class inside of a method, it implicitly has a pointer to
the outer scope of the method.  Spark doesn't have access to this scope, so
this makes it hard (impossible?) for us to construct new instances of that
class.

So, define your classes that you plan to use with Spark at the top level.

On Mon, Apr 25, 2016 at 9:36 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> I notice buiding with sbt if I define my case class *outside of main
> method* like below it works
>
>
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
>
> object Import_nw_10124772 {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("Import_nw_10124772").
>setMaster("local[12]").
>set("spark.driver.allowMultipleContexts", "true").
>set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
>   //
>   // Get a DF first based on Databricks CSV libraries ignore column
> heading because of column called "Type"
>   //
>   val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
>   //df.printSchema
>   //
>val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
>
>
> However, if I put that case class with the main method, it throws "No
> TypeTag available for Accounts" error
>
> Apparently when case class is defined inside of the method that it is
> being used, it is not fully defined at that point.
>
> Is this a bug within Spark?
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Dataset aggregateByKey equivalent

2016-04-25 Thread Lee Becker
On Sat, Apr 23, 2016 at 8:56 AM, Michael Armbrust 
wrote:

> Have you looked at aggregators?
>
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>

Thanks for the pointer to aggregators.  I wasn't yet aware of them.
However, I still get similar errors when attempting to go the Dataset
route.  Here is my attempt at an aggregator class for the example above.


import org.apache.spark.sql.expressions.Aggregator

case class KeyVal(k: Int, v: Int)

val keyValsDs = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield
KeyVal(i,j)).toDS

class AggKV extends Aggregator[KeyVal, List[KeyVal], List[KeyVal]]
with Serializable {
  override def zero: List[KeyVal] = List()

  override def reduce(b: List[KeyVal], a: KeyVal): List[KeyVal] = b :+ a

  override def finish(reduction: List[KeyVal]): List[KeyVal] = reduction

  override def merge(b1: List[KeyVal], b2: List[KeyVal]): List[KeyVal]
= b1 ++ b2
}

The following shows production of the correct schema

keyValsDs.groupBy($"k").agg(new AggKV().toColumn)

 org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, List[KeyVal])] =
[_1: struct, _2: struct>>]

Actual execution fails with Task not serializable.  Am I missing something
or is this just not possible without dropping into RDDs?

scala> keyValsDs.groupBy($"k").agg(new AggKV().toColumn).show
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Final,isDistinct=false)],
output=[key#739,AggKV(k,v)#738])
+- ConvertToSafe
   +- Sort [k#684 ASC], false, 0
  +- TungstenExchange hashpartitioning(k#684,200), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[k#684],
functions=[(AggKV(k#684,v#685),mode=Partial,isDistinct=false)],
output=[k#684,value#731])
   +- ConvertToSafe
  +- Sort [k#684 ASC], false, 0
 +- Project [k#684,v#685,k#684]
+- Scan ExistingRDD[k#684,v#685]

at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)
at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org
$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394)
at org.apache.spark.sql.Dataset.show(Dataset.scala:228)
at org.apache.spark.sql.Dataset.show(Dataset.scala:192)
at org.apache.spark.sql.Dataset.show(Dataset.scala:200)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at $iwC$$iwC$$iwC$$iwC.(:71)
at $iwC$$iwC$$iwC.(:73)
at $iwC$$iwC.(:75)
at $iwC.(:77)
at (:79)
at .(:83)
at .()
 

Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-25 Thread Michael Armbrust
Spark SQL's query planner has always delayed building the RDD, so has never
needed to eagerly calculate the range boundaries (since Spark 1.0).

On Mon, Apr 25, 2016 at 2:04 AM, Praveen Devarao 
wrote:

> Thanks Reynold for the reason as to why sortBykey invokes a Job
>
> When you say "DataFrame/Dataset does not have this issue" is it right to
> assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in
> it?
>
> Thanking You
>
> -
> Praveen Devarao
> Spark Technology Centre
> IBM India Software Labs
>
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>
>
>
> From:Reynold Xin 
> To:Praveen Devarao/India/IBM@IBMIN
> Cc:"d...@spark.apache.org" , user <
> user@spark.apache.org>
> Date:25/04/2016 11:26 am
> Subject:Re: Do transformation functions on RDD invoke a Job
> [sc.runJob]?
> --
>
>
>
> Usually no - but sortByKey does because it needs the range boundary to be
> built in order to have the RDD. It is a long standing problem that's
> unfortunately very difficult to solve without breaking the RDD API.
>
> In DataFrame/Dataset we don't have this issue though.
>
>
> On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao <*praveen...@in.ibm.com*
> > wrote:
> Hi,
>
> I have a streaming program with the block as below [ref:
> *https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala*
> 
> ]
>
> *1 val **lines *= *messages*.map(_._2)
> *2 val **hashTags *= *lines*.flatMap(status => status.split(*" "*
> ).filter(_.startsWith(*"#"*)))
>
> *3 val **topCounts60 *= *hashTags*.map((_, 1)).reduceByKey( _ + _ )
> *3a* .map { *case *(topic, count) => (count, topic) }
> *3b* .transform(_.sortByKey(*false*))
>
> *4a**topCounts60*.foreachRDD( rdd => {
> *4b* *val *topList = rdd.take( 10 )
> })
>
> This batch is triggering 2 jobs...one at line *3b**(sortByKey)* and
> the other at *4b (rdd.take) *I agree that there is a Job triggered on
> line 4b as take() is an action on RDD while as on line 3b sortByKey is just
> a transformation function which as per docs is lazy evaluation...but I see
> that this line uses a RangePartitioner and Rangepartitioner on
> initialization invokes a method called *sketch() *that invokes *collect()*
> triggering a Job.
>
> My question: Is it expected that sortByKey will invoke a Job...if
> yes, why is sortByKey listed as a transformation and not action. Are there
> any other functions like this that invoke a Job, though they are
> transformations and not actions?
>
> I am on Spark 1.6
>
> Thanking You
>
> -
> Praveen Devarao
> Spark Technology Centre
> IBM India Software Labs
>
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>
>
>
>


Call Spark package API from R

2016-04-25 Thread ankur.jain
Hello Team,

Is there any way to call spark code (scala/python) from R?
I want to use Cloudera spark-ts api with SparkR, if anyone had used that
please let me know.

Thank
Ankur



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Call-Spark-package-API-from-R-tp26831.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



issues with spark metrics

2016-04-25 Thread Gábor Fehér
Hi All,

I am trying to set up monitoring to better understand the performance
bottlenecks of my Spark application. I have some questions:

1. BlockManager.disk.diskSpaceUsed_MB is always zero when I go to
http://localhost:4040/metrics/json/
Even though I know that blockmanager is using a lot of disk space, e.g.
because the Linux command
du -msc /tmp/blockmgr-bee83574-d958-4ef0-aaa7-a45f5012bdff  # my current
blockmanager directory
returns a large non-zero number.

2. I am really curious how much memory space are my cached RDDs taking. Are
there any metrics related to that? I can't see any in
http://localhost:4040/metrics/json/, even though the web UI does show some
related numbers, e.g. under the storage tab.

I am running Spark in local mode, maybe that is a problem? Or am I looking
at a wrong metrics endpoint?

Thanks,
Gabor

p.s.: Some more details:

This is my metrics.properties file:

```
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=localhost
*.sink.graphite.port=9109
*.sink.graphite.period=1
*.sink.graphite.unit=seconds

master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
```
I want to use Graphite+Prometheus+Grafana to view my metrics in the end,
but that's not important for now. If I could get what I want in the JSON
dumps, I will already be happy!

And this is how I am starting my app:
```
~/spark-1.6.0/bin/spark-submit \
  --conf spark.metrics.conf=metrics.properties \
  --class "SimpleApp" \
  --master "local[4]" \
  --driver-memory 16g \
  target/scala-2.10/simple-project_2.10-1.0.jar
```


Spark 1.6.1 throws error: Did not find registered driver with class oracle.jdbc.OracleDriver

2016-04-25 Thread Mich Talebzadeh
Hi,

This JDBC connection was working fine in Spark 1.5,2

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val sqlContext = new HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
//
var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb"
var _username : String = "scratchpad"
var _password : String = "xxx"
//
val s = HiveContext.load("jdbc",
Map("url" -> _ORACLEserver,
"dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED,
to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
"user" -> _username,
"password" -> _password))

s.toDF.registerTempTable("tmp")


// Need to create and populate target ORC table sales in database test in
Hive
//
HiveContext.sql("use test")
//
// Drop and create table
//
HiveContext.sql("DROP TABLE IF EXISTS test.dummy2")
var sqltext : String = ""
sqltext = """
CREATE TABLE test.dummy2
 (
 ID INT
   , CLUSTERED INT
   , SCATTERED INT
   , RANDOMISED INT
   , RANDOM_STRING VARCHAR(50)
   , SMALL_VC VARCHAR(10)
   , PADDING  VARCHAR(10)
)
CLUSTERED BY (ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="1" )
"""
HiveContext.sql(sqltext)
//
sqltext = """
INSERT INTO TABLE test.dummy2
SELECT
*
FROM tmp
"""
HiveContext.sql(sqltext)

In Spark 1.6.1, it is throwing error as below


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
1.0 (TID 4, rhes564): java.lang.IllegalStateException: Did not find
registered driver with class oracle.jdbc.OracleDriver

Is this a new bug introduced in Spark 1.6.1?


Thanks


Re: reduceByKey as Action or Transformation

2016-04-25 Thread Ted Yu
Can you show snippet of your code which demonstrates what you observed ?

Thansk

On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu  wrote:

> Thanks.
> I read that from the specification.
> I thought the way people distinguish actions and transformations depends
> on whether they are lazily executed or not.
> As far as I saw from my codes, the reduceByKey will be executed without
> any operations in the Action category.
> Please correct me if I am wrong.
>
> Thanks,
> Regards,
> Weiping
>
> On 25.04.2016 17:20, Chadha Pooja wrote:
>
>> Reduce By Key is a Transformation
>>
>> http://spark.apache.org/docs/latest/programming-guide.html#transformations
>>
>> Thanks
>>
>> _
>>
>> Pooja Chadha
>> Senior Architect
>> THE BOSTON CONSULTING GROUP
>> Mobile +1 617 794 3862
>>
>>
>> _
>>
>>
>>
>> -Original Message-
>> From: Weiping Qu [mailto:q...@informatik.uni-kl.de]
>> Sent: Monday, April 25, 2016 11:05 AM
>> To: u...@spark.incubator.apache.org
>> Subject: reduceByKey as Action or Transformation
>>
>> Hi,
>>
>> I'd like just to verify that whether reduceByKey is transformation or
>> actions.
>> As written in RDD papers, spark flow will not be triggered only if
>> actions are reached.
>> I tried and saw that the my flow will be executed once there is a
>> reduceByKey while it is categorized into transformations in Spark 1.6.1
>> specification.
>>
>> Thanks and Regards,
>> Weiping
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>> __
>> The Boston Consulting Group, Inc.
>>   This e-mail message may contain confidential and/or privileged
>> information.
>> If you are not an addressee or otherwise authorized to receive this
>> message,
>> you should not use, copy, disclose or take any action based on this
>> e-mail or
>> any information contained in the message. If you have received this
>> material
>> in error, please advise the sender immediately by reply e-mail and delete
>> this
>> message. Thank you.
>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


unsubsribe

2016-04-25 Thread Kartik Veerepalli
Unsubscribe

Kartik Veerepalli
Software Developer
560 Herndon Parkway, Suite 240
Herndon, VA 20170
(w) 703-437-0100
(f) 703-940-6001
www.syntasa.com | Connect with us [Twitter] 
 [Facebook]  
[LinkedIn] 
[cid:9A1A7D05-703D-40AD-84CB-7C05B0E80712]




Re: reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Thanks.
I read that from the specification.
I thought the way people distinguish actions and transformations depends 
on whether they are lazily executed or not.
As far as I saw from my codes, the reduceByKey will be executed without 
any operations in the Action category.

Please correct me if I am wrong.

Thanks,
Regards,
Weiping

On 25.04.2016 17:20, Chadha Pooja wrote:

Reduce By Key is a Transformation

http://spark.apache.org/docs/latest/programming-guide.html#transformations

Thanks
_

Pooja Chadha
Senior Architect
THE BOSTON CONSULTING GROUP
Mobile +1 617 794 3862

_


-Original Message-
From: Weiping Qu [mailto:q...@informatik.uni-kl.de]
Sent: Monday, April 25, 2016 11:05 AM
To: u...@spark.incubator.apache.org
Subject: reduceByKey as Action or Transformation

Hi,

I'd like just to verify that whether reduceByKey is transformation or
actions.
As written in RDD papers, spark flow will not be triggered only if
actions are reached.
I tried and saw that the my flow will be executed once there is a
reduceByKey while it is categorized into transformations in Spark 1.6.1
specification.

Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

__
The Boston Consulting Group, Inc.
  
This e-mail message may contain confidential and/or privileged information.

If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GroubByKey Iterable[T] - Memory usage

2016-04-25 Thread Nirav Patel
Hi,

Is the Iterable from out of GroupByKey is loaded fully into memory of
reducer task or can it also be on disk?

Also, is there a way to evacuate from memory once reducer is done iterating
it and want to use memory for something else.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 2.0 forthcoming features

2016-04-25 Thread Sourav Mazumder
Thanks a lot Michael and Jules.

Regards,
Sourav

On Thu, Apr 21, 2016 at 3:08 PM, Jules Damji  wrote:

> Thanks Michael, we're doing a Spark 2.0 webinar. Register and if you can't
> make it; you can always watch the recording.
>
> Cheers
> Jules
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Apr 20, 2016, at 10:15 AM, Michael Malak <
> michaelma...@yahoo.com.INVALID > wrote:
>
>
> http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin
>
>
>
>
> --
> *From:* Sourav Mazumder 
> *To:* user 
> *Sent:* Wednesday, April 20, 2016 11:07 AM
> *Subject:* Spark 2.0 forthcoming features
>
> Hi All,
>
> Is there somewhere we can get idea of the upcoming features in Spark 2.0.
>
> I got a list for Spark ML from here
> https://issues.apache.org/jira/browse/SPARK-12626.
>
> Is there other links where I can similar enhancements planned for Sparl
> SQL, Spark Core, Spark Streaming. GraphX etc. ?
>
> Thanks in advance.
>
> Regards,
> Sourav
>
>
>


Re: [Spark 1.5.2]All data being written to only one part file rest part files are empty

2016-04-25 Thread nguyen duc tuan
Maybe the problem is the data itself. For example, the first dataframe
might has common keys in only one part of the second dataframe. I think you
can verify if you are in this situation by repartition one dataframe and
join it. If this is the true reason, you might see the result distributed
more evenly.

2016-04-25 9:34 GMT+07:00 Divya Gehlot :

> Hi,
>
> After joining two dataframes, saving dataframe using Spark CSV.
> But all the result data is being written to only one part file whereas
> there are 200 part files being created, rest 199 part files are empty.
>
> What is the cause of uneven partitioning ? How can I evenly distribute the
> data ?
> Would really appreciate the help.
>
>
> Thanks,
> Divya
>


reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Hi,

I'd like just to verify that whether reduceByKey is transformation or 
actions.
As written in RDD papers, spark flow will not be triggered only if 
actions are reached.
I tried and saw that the my flow will be executed once there is a 
reduceByKey while it is categorized into transformations in Spark 1.6.1 
specification.


Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-25 Thread Cody Koeninger
Show the full relevant code including imports.

On Fri, Apr 22, 2016 at 4:46 PM, Mich Talebzadeh
 wrote:
> Hi Cody,
>
> This is my first attempt on using offset ranges (this may not mean much in
> my context at the moment)
>
> val ssc = new StreamingContext(conf, Seconds(10))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092",
> "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" ->
> "rhes564:2181", "group.id" -> "StreamTest" )
> val topics = Set("newtopic", "newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> val lines = dstream.map(_._2)
> val showResults = lines.filter(_.contains("statement cache")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> // Define the offset ranges to read in the batch job. Just one offset range
> val offsetRanges = Array(
>   OffsetRange("newtopic", 0, 110, 220)
> )
> // Create the RDD based on the offset ranges
> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> StringDecoder](sc, kafkaParams, offsetRanges)
>
>
> This comes back with error
>
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
> not found: value OffsetRange
> [error]   OffsetRange("newtopic", 0, 110, 220),
> [error]   ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> Any ideas will be appreciated
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 22 April 2016 at 22:04, Cody Koeninger  wrote:
>>
>> Spark streaming as it exists today is always microbatch.
>>
>> You can certainly filter messages using spark streaming.
>>
>>
>> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
>>  wrote:
>> > yep actually using createDirectStream sounds a better way of doing it.
>> > Am I
>> > correct that createDirectStream was introduced to overcome
>> > micro-batching
>> > limitations?
>> >
>> > In a nutshell I want to pickup all the messages and keep signal
>> > according to
>> > pre-built criteria (say indicating a buy signal) and ignore the
>> > pedestals
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 22 April 2016 at 21:56, Cody Koeninger  wrote:
>> >>
>> >> You can still do sliding windows with createDirectStream, just do your
>> >> map / extraction of fields before the window.
>> >>
>> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
>> >>  wrote:
>> >> > Hi Cody,
>> >> >
>> >> > I want to use sliding windows for Complex Event Processing
>> >> > micro-batching
>> >> >
>> >> > Dr Mich Talebzadeh
>> >> >
>> >> >
>> >> >
>> >> > LinkedIn
>> >> >
>> >> >
>> >> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >> >
>> >> >
>> >> >
>> >> > http://talebzadehmich.wordpress.com
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On 22 April 2016 at 21:51, Cody Koeninger  wrote:
>> >> >>
>> >> >> Why are you wanting to convert?
>> >> >>
>> >> >> As far as doing the conversion, createStream doesn't take the same
>> >> >> arguments, look at the docs.
>> >> >>
>> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>> >> >>  wrote:
>> >> >> > Hi,
>> >> >> >
>> >> >> > What is the best way of converting this program of that uses
>> >> >> > KafkaUtils.createDirectStream to Sliding window using
>> >> >> >
>> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> >
>> >> >> > to
>> >> >> >
>> >> >> > val dstream = KafkaUtils.createStream[String, String,
>> >> >> > StringDecoder,
>> >> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >> >
>> >> >> >
>> >> >> > The program below works
>> >> >> >
>> >> >> >
>> >> >> > import org.apache.spark.SparkContext
>> >> >> > import org.apache.spark.SparkConf
>> >> >> > import org.apache.spark.sql.Row
>> >> >> > import org.apache.spark.sql.hive.HiveContext
>> >> >> > import org.apache.spark.sql.types._
>> >> >> > import org.apache.spark.sql.SQLContext
>> >> >> > import org.apache.spark.sql.functions._
>> >> >> > import _root_.kafka.serializer.StringDecoder
>> >> >> > import org.apache.spark.streaming._
>> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils
>> >> >> > //
>> >> >> > object CEP_assembly {
>> >> >> >   def main(args: Array[String]) {
>> >> >> >   val conf = new 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-25 Thread Cody Koeninger
I would suggest reading the documentation first.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.OffsetRange$

The OffsetRange class is not private.  The instance constructor is
private.  You obtain instances by using the apply method on the
companion object, ie do

OffsetRange(...)
not
new OffsetRange(...)



On Fri, Apr 22, 2016 at 6:27 PM, Mich Talebzadeh
 wrote:
> So is there anyway of creating an rdd without using offsetRanges? Sorry for
> lack of clarity here
>
>
> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> StringDecoder](sc, kafkaParams, offsetRanges)
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 23 April 2016 at 00:13, Mich Talebzadeh 
> wrote:
>>
>> So there is really no point in using it :(
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>> On 23 April 2016 at 00:11, Ted Yu  wrote:
>>>
>>> The class is private :
>>>
>>> final class OffsetRange private(
>>>
>>> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh
>>>  wrote:

 Ok I decided to forgo that approach and use an existing program of mine
 with slight modification. The code is this

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.functions._
 import _root_.kafka.serializer.StringDecoder
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
 //
 object CEP_assembly {
   def main(args: Array[String]) {
   val conf = new SparkConf().
setAppName("CEP_assembly").
setMaster("local[2]").
set("spark.driver.allowMultipleContexts", "true").
set("spark.hadoop.validateOutputSpecs", "false")
   val sc = new SparkContext(conf)
   // Create sqlContext based on HiveContext
   val sqlContext = new HiveContext(sc)
   import sqlContext.implicits._
   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
   println ("\nStarted at"); sqlContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 val ssc = new StreamingContext(conf, Seconds(1))
 ssc.checkpoint("checkpoint")
 val kafkaParams = Map[String, String]("bootstrap.servers" ->
 "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
 "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
 val topics = Set("newtopic", "newtopic")
 val dstream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics)
 dstream.cache()
 val lines = dstream.map(_._2)
 val showResults = lines.filter(_.contains("statement
 cache")).flatMap(line => line.split("\n,")).map(word => (word,
 1)).reduceByKey(_ + _)
 // Define the offset ranges to read in the batch job
 val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
 // Create the RDD based on the offset ranges
 val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
 StringDecoder](sc, kafkaParams, offsetRanges)
 ssc.start()
 ssc.awaitTermination()
 //ssc.stop()
   println ("\nFinished at"); sqlContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
   }
 }


 With sbt

 libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
 "provided"
 libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
 "provided"
 libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
 "provided"
 libraryDependencies += "junit" % "junit" % "4.12"
 libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
 libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
 % "provided"
 libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
 "1.6.1"
 libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
 libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
 % "test"
 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
 "1.6.1"


 

Re: next on empty iterator though i used hasNext

2016-04-25 Thread Ted Yu
Can you show more of your code inside the while loop ?

Which version of Spark / Kinesis do you use ?

Thanks

On Mon, Apr 25, 2016 at 4:04 AM, Selvam Raman  wrote:

> I am reading a data from Kinesis stream (merging shard values with union
> stream) to spark streaming. then doing the following code to push the data
> to DB.
> ​
>
> splitCSV.foreachRDD(new VoidFunction2,Time>()
> {
>
> private static final long serialVersionUID = 1L;
>
> public void call(JavaRDD rdd, Time time) throws Exception
> {
> JavaRDD varMapRDD = rdd.map(new Function()
> {
> private static final long serialVersionUID = 1L;
>
> public SFieldBean call(String[] values) throws Exception
> {
> .
> );
>
> varMapRDD.foreachPartition(new VoidFunction(
> {
> private static final long serialVersionUID = 1L;
> MySQLConnectionHelper.getConnection("urlinfo");
> @Override
> public void call(Iterator iterValues) throws Exception
> {
> 
> while(iterValues.hasNext())
> {
>
> }
> }
>
> Though I am using hasNext but it throws the follwing error
> ​
> Caused by: java.util.NoSuchElementException: next on empty iterator
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> at
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
> at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
> at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319)
> at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> ... 3 more
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


RE: Java exception when showing join

2016-04-25 Thread Yong Zhang
sorry I didn't pay attention you are using pyspark, so ignore my reply, as I 
only use Scala version.
Yong

From: java8...@hotmail.com
To: webe...@aim.com; user@spark.apache.org
Subject: RE: Java exception when showing join
Date: Mon, 25 Apr 2016 09:41:18 -0400




dispute_df.join(comments_df, $"dispute_df.COMMENTID" === 
$"comments_df.COMMENTID").first()
If you are using DataFrame API, and some of them are trick for first time user, 
my suggestion is to always referring the unit tests. That is in fact the way I 
tried to find out how to do it for lots of cases.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Yong

> Subject: Re: Java exception when showing join
> From: webe...@aim.com
> To: java8...@hotmail.com; user@spark.apache.org
> Date: Mon, 25 Apr 2016 07:45:12 -0500
> 
> I get an invalid syntax error when I do that.
> 
> On Fri, 2016-04-22 at 20:06 -0400, Yong Zhang wrote:
> > use "dispute_df.join(comments_df, dispute_df.COMMENTID ===
> > comments_df.COMMENTID).first()" instead.
> > 
> > Yong
> > 
> > Date: Fri, 22 Apr 2016 17:42:26 -0400
> > From: webe...@aim.com
> > To: user@spark.apache.org
> > Subject: Java exception when showing join
> > 
> > I am using pyspark with netezza.  I am getting a java exception when
> > trying to show the first row of a join.  I can show the first row for
> > of the two dataframes separately but not the result of a join.  I get
> > the same error for any action I take(first, collect, show).  Am I
> > doing something wrong?
> > 
> > from pyspark.sql import SQLContext
> > sqlContext = SQLContext(sc)
> > dispute_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table1',
> > driver='com.ibm.spark.netezza').load()
> > dispute_df.printSchema()
> > comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table2',
> > driver='com.ibm.spark.netezza').load()
> > comments_df.printSchema()
> > dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > 
> > root
> >  |-- COMMENTID: string (nullable = true)
> >  |-- EXPORTDATETIME: timestamp (nullable = true)
> >  |-- ARTAGS: string (nullable = true)
> >  |-- POTAGS: string (nullable = true)
> >  |-- INVTAG: string (nullable = true)
> >  |-- ACTIONTAG: string (nullable = true)
> >  |-- DISPUTEFLAG: string (nullable = true)
> >  |-- ACTIONFLAG: string (nullable = true)
> >  |-- CUSTOMFLAG1: string (nullable = true)
> >  |-- CUSTOMFLAG2: string (nullable = true)
> > 
> > root
> >  |-- COUNTRY: string (nullable = true)
> >  |-- CUSTOMER: string (nullable = true)
> >  |-- INVNUMBER: string (nullable = true)
> >  |-- INVSEQNUMBER: string (nullable = true)
> >  |-- LEDGERCODE: string (nullable = true)
> >  |-- COMMENTTEXT: string (nullable = true)
> >  |-- COMMENTTIMESTAMP: timestamp (nullable = true)
> >  |-- COMMENTLENGTH: long (nullable = true)
> >  |-- FREEINDEX: long (nullable = true)
> >  |-- COMPLETEDFLAG: long (nullable = true)
> >  |-- ACTIONFLAG: long (nullable = true)
> >  |-- FREETEXT: string (nullable = true)
> >  |-- USERNAME: string (nullable = true)
> >  |-- ACTION: string (nullable = true)
> >  |-- COMMENTID: string (nullable = true)
> > 
> > ---
> > 
> > Py4JJavaError Traceback (most recent call
> > last)
> >  in ()
> >   5 comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
> > user='rnahar', password='Sfeb2016',
> > dbtable='UK_METRICS.EU_COMMENTS2',
> > driver='com.ibm.spark.netezza').load()
> >   6 comments_df.printSchema()
> > > 7 dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in first(self)
> > 802 Row(age=2, name=u'Alice')
> > 803 """
> > --> 804 return self.head()
> > 805 
> > 806 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 790 """
> > 791 if n is None:
> > --> 792 rs = self.head(1)
> > 793 return rs[0] if rs else None
> > 794 return self.take(n)
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 792 rs = self.head(1)
> > 793 return rs[0] if rs else None
> > --> 794 return self.take(n)
> > 795 
> > 796 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in take(self, num)
> > 304 with 

RE: Java exception when showing join

2016-04-25 Thread Yong Zhang
dispute_df.join(comments_df, $"dispute_df.COMMENTID" === 
$"comments_df.COMMENTID").first()
If you are using DataFrame API, and some of them are trick for first time user, 
my suggestion is to always referring the unit tests. That is in fact the way I 
tried to find out how to do it for lots of cases.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Yong

> Subject: Re: Java exception when showing join
> From: webe...@aim.com
> To: java8...@hotmail.com; user@spark.apache.org
> Date: Mon, 25 Apr 2016 07:45:12 -0500
> 
> I get an invalid syntax error when I do that.
> 
> On Fri, 2016-04-22 at 20:06 -0400, Yong Zhang wrote:
> > use "dispute_df.join(comments_df, dispute_df.COMMENTID ===
> > comments_df.COMMENTID).first()" instead.
> > 
> > Yong
> > 
> > Date: Fri, 22 Apr 2016 17:42:26 -0400
> > From: webe...@aim.com
> > To: user@spark.apache.org
> > Subject: Java exception when showing join
> > 
> > I am using pyspark with netezza.  I am getting a java exception when
> > trying to show the first row of a join.  I can show the first row for
> > of the two dataframes separately but not the result of a join.  I get
> > the same error for any action I take(first, collect, show).  Am I
> > doing something wrong?
> > 
> > from pyspark.sql import SQLContext
> > sqlContext = SQLContext(sc)
> > dispute_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table1',
> > driver='com.ibm.spark.netezza').load()
> > dispute_df.printSchema()
> > comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://***:5480/db', user='***', password='***', dbtable='table2',
> > driver='com.ibm.spark.netezza').load()
> > comments_df.printSchema()
> > dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > 
> > root
> >  |-- COMMENTID: string (nullable = true)
> >  |-- EXPORTDATETIME: timestamp (nullable = true)
> >  |-- ARTAGS: string (nullable = true)
> >  |-- POTAGS: string (nullable = true)
> >  |-- INVTAG: string (nullable = true)
> >  |-- ACTIONTAG: string (nullable = true)
> >  |-- DISPUTEFLAG: string (nullable = true)
> >  |-- ACTIONFLAG: string (nullable = true)
> >  |-- CUSTOMFLAG1: string (nullable = true)
> >  |-- CUSTOMFLAG2: string (nullable = true)
> > 
> > root
> >  |-- COUNTRY: string (nullable = true)
> >  |-- CUSTOMER: string (nullable = true)
> >  |-- INVNUMBER: string (nullable = true)
> >  |-- INVSEQNUMBER: string (nullable = true)
> >  |-- LEDGERCODE: string (nullable = true)
> >  |-- COMMENTTEXT: string (nullable = true)
> >  |-- COMMENTTIMESTAMP: timestamp (nullable = true)
> >  |-- COMMENTLENGTH: long (nullable = true)
> >  |-- FREEINDEX: long (nullable = true)
> >  |-- COMPLETEDFLAG: long (nullable = true)
> >  |-- ACTIONFLAG: long (nullable = true)
> >  |-- FREETEXT: string (nullable = true)
> >  |-- USERNAME: string (nullable = true)
> >  |-- ACTION: string (nullable = true)
> >  |-- COMMENTID: string (nullable = true)
> > 
> > ---
> > 
> > Py4JJavaError Traceback (most recent call
> > last)
> >  in ()
> >   5 comments_df =
> > sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> > ezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
> > user='rnahar', password='Sfeb2016',
> > dbtable='UK_METRICS.EU_COMMENTS2',
> > driver='com.ibm.spark.netezza').load()
> >   6 comments_df.printSchema()
> > > 7 dispute_df.join(comments_df, dispute_df.COMMENTID ==
> > comments_df.COMMENTID).first()
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in first(self)
> > 802 Row(age=2, name=u'Alice')
> > 803 """
> > --> 804 return self.head()
> > 805 
> > 806 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 790 """
> > 791 if n is None:
> > --> 792 rs = self.head(1)
> > 793 return rs[0] if rs else None
> > 794 return self.take(n)
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
> > 792 rs = self.head(1)
> > 793 return rs[0] if rs else None
> > --> 794 return self.take(n)
> > 795 
> > 796 @ignore_unicode_prefix
> > 
> > /usr/local/src/spark/spark-1.6.1-bin-
> > hadoop2.6/python/pyspark/sql/dataframe.pyc in take(self, num)
> > 304 with SCCallSiteSync(self._sc) as css:
> > 305 port =
> > self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndSe
> > rve(
> > --> 306 self._jdf, num)
> > 307 return list(_load_from_socket(port,
> > 

Defining case class within main method throws "No TypeTag available for Accounts"

2016-04-25 Thread Mich Talebzadeh
Hi,

I notice buiding with sbt if I define my case class *outside of main method*
like below it works


case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)

object Import_nw_10124772 {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("Import_nw_10124772").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
  //
  // Get a DF first based on Databricks CSV libraries ignore column heading
because of column called "Type"
  //
  val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
  //df.printSchema
  //
   val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))


However, if I put that case class with the main method, it throws "No
TypeTag available for Accounts" error

Apparently when case class is defined inside of the method that it is being
used, it is not fully defined at that point.

Is this a bug within Spark?

Thanks




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


New executor pops up on every node when any executor dies

2016-04-25 Thread Vaclav Vymazal
Hi,
 I have a Spark streaming application that is pumping data from Kafka into
HDFS and Elasticsearch. The application is running on a Spark Standalone
cluster (in client mode).

Whenever one of the executors fails (or is killed), new executor for the
application is spawned on every node in the cluster.
Since the executors have a pretty high memory allocation, this typically
leads to a cascading failure.

The application is using updateStateByKey and checkpointing (on HDFS). I
was able to reproduce the issue with the application on Spark 1.6.0 and
1.6.1.

I tried to reproduce the behavior with the streaming.HdfsWordCount example,
but the executor restart worked fine there.

Thanks,
V.


Re: Java exception when showing join

2016-04-25 Thread Brent S. Elmer Ph.D.
I get an invalid syntax error when I do that.

On Fri, 2016-04-22 at 20:06 -0400, Yong Zhang wrote:
> use "dispute_df.join(comments_df, dispute_df.COMMENTID ===
> comments_df.COMMENTID).first()" instead.
> 
> Yong
> 
> Date: Fri, 22 Apr 2016 17:42:26 -0400
> From: webe...@aim.com
> To: user@spark.apache.org
> Subject: Java exception when showing join
> 
> I am using pyspark with netezza.  I am getting a java exception when
> trying to show the first row of a join.  I can show the first row for
> of the two dataframes separately but not the result of a join.  I get
> the same error for any action I take(first, collect, show).  Am I
> doing something wrong?
> 
> from pyspark.sql import SQLContext
> sqlContext = SQLContext(sc)
> dispute_df =
> sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> ezza://***:5480/db', user='***', password='***', dbtable='table1',
> driver='com.ibm.spark.netezza').load()
> dispute_df.printSchema()
> comments_df =
> sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> ezza://***:5480/db', user='***', password='***', dbtable='table2',
> driver='com.ibm.spark.netezza').load()
> comments_df.printSchema()
> dispute_df.join(comments_df, dispute_df.COMMENTID ==
> comments_df.COMMENTID).first()
> 
> 
> root
>  |-- COMMENTID: string (nullable = true)
>  |-- EXPORTDATETIME: timestamp (nullable = true)
>  |-- ARTAGS: string (nullable = true)
>  |-- POTAGS: string (nullable = true)
>  |-- INVTAG: string (nullable = true)
>  |-- ACTIONTAG: string (nullable = true)
>  |-- DISPUTEFLAG: string (nullable = true)
>  |-- ACTIONFLAG: string (nullable = true)
>  |-- CUSTOMFLAG1: string (nullable = true)
>  |-- CUSTOMFLAG2: string (nullable = true)
> 
> root
>  |-- COUNTRY: string (nullable = true)
>  |-- CUSTOMER: string (nullable = true)
>  |-- INVNUMBER: string (nullable = true)
>  |-- INVSEQNUMBER: string (nullable = true)
>  |-- LEDGERCODE: string (nullable = true)
>  |-- COMMENTTEXT: string (nullable = true)
>  |-- COMMENTTIMESTAMP: timestamp (nullable = true)
>  |-- COMMENTLENGTH: long (nullable = true)
>  |-- FREEINDEX: long (nullable = true)
>  |-- COMPLETEDFLAG: long (nullable = true)
>  |-- ACTIONFLAG: long (nullable = true)
>  |-- FREETEXT: string (nullable = true)
>  |-- USERNAME: string (nullable = true)
>  |-- ACTION: string (nullable = true)
>  |-- COMMENTID: string (nullable = true)
> 
> ---
> 
> Py4JJavaError Traceback (most recent call
> last)
>  in ()
>   5 comments_df =
> sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:net
> ezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
> user='rnahar', password='Sfeb2016',
> dbtable='UK_METRICS.EU_COMMENTS2',
> driver='com.ibm.spark.netezza').load()
>   6 comments_df.printSchema()
> > 7 dispute_df.join(comments_df, dispute_df.COMMENTID ==
> comments_df.COMMENTID).first()
> 
> /usr/local/src/spark/spark-1.6.1-bin-
> hadoop2.6/python/pyspark/sql/dataframe.pyc in first(self)
>     802 Row(age=2, name=u'Alice')
>     803 """
> --> 804 return self.head()
>     805 
>     806 @ignore_unicode_prefix
> 
> /usr/local/src/spark/spark-1.6.1-bin-
> hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
>     790 """
>     791 if n is None:
> --> 792 rs = self.head(1)
>     793 return rs[0] if rs else None
>     794 return self.take(n)
> 
> /usr/local/src/spark/spark-1.6.1-bin-
> hadoop2.6/python/pyspark/sql/dataframe.pyc in head(self, n)
>     792 rs = self.head(1)
>     793 return rs[0] if rs else None
> --> 794 return self.take(n)
>     795 
>     796 @ignore_unicode_prefix
> 
> /usr/local/src/spark/spark-1.6.1-bin-
> hadoop2.6/python/pyspark/sql/dataframe.pyc in take(self, num)
>     304 with SCCallSiteSync(self._sc) as css:
>     305 port =
> self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndSe
> rve(
> --> 306 self._jdf, num)
>     307 return list(_load_from_socket(port,
> BatchedSerializer(PickleSerializer(
>     308 
> 
> /usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-
> src.zip/py4j/java_gateway.py in __call__(self, *args)
>     811 answer = self.gateway_client.send_command(command)
>     812 return_value = get_return_value(
> --> 813 answer, self.gateway_client, self.target_id,
> self.name)
>     814 
>     815 for temp_arg in temp_args:
> 
> /usr/local/src/spark/spark-1.6.1-bin-
> hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
>  43 def deco(*a, **kw):
>  44 try:
> ---> 45 return f(*a, **kw)
>  46 except py4j.protocol.Py4JJavaError as e:
>  47 s = e.java_exception.toString()
> 
> /usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-
> 

next on empty iterator though i used hasNext

2016-04-25 Thread Selvam Raman
I am reading a data from Kinesis stream (merging shard values with union
stream) to spark streaming. then doing the following code to push the data
to DB.
​

splitCSV.foreachRDD(new VoidFunction2,Time>()
{

private static final long serialVersionUID = 1L;

public void call(JavaRDD rdd, Time time) throws Exception
{
JavaRDD varMapRDD = rdd.map(new Function()
{
private static final long serialVersionUID = 1L;

public SFieldBean call(String[] values) throws Exception
{
.
);

varMapRDD.foreachPartition(new VoidFunction(
{
private static final long serialVersionUID = 1L;
MySQLConnectionHelper.getConnection("urlinfo");
@Override
public void call(Iterator iterValues) throws Exception
{

while(iterValues.hasNext())
{

}
}

Though I am using hasNext but it throws the follwing error
​
Caused by: java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319)
at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
... 3 more


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


GraphFrames and IPython notebook issue - No module named graphframes

2016-04-25 Thread Camelia Elena Ciolac
Hello,


I work locally on my laptop, not using DataBricks Community edition.


I downloaded  graphframes-0.1.0-spark1.6.jar from 
http://spark-packages.org/package/graphframes/graphframes

and placed it in a folder  named spark_extra_jars where I have other jars too.


After executing in a terminal:


ipython notebook --profile = nbserver



I open in the browser http://127.0.0.1:/ and in my IPython notebook I have, 
among others :


jar_path = 
'/home/camelia/spark_extra_jars/spark-csv_2.11-1.2.0.jar,/home/camelia/spark_extra_jars/commons-csv-1.2.jar,/home/camelia/spark_extra_jars/graphframes-0.1.0-spark1.6.jar,/home/camelia/spark_extra_jars/spark-mongodb_2.10-0.11.0.jar'


config = 
SparkConf().setAppName("graph_analytics").setMaster("local[4]").set("spark.jars",
 jar_path)

I can successfully import the other modules, but when I do

import graphframes

It gives the error:


ImportError   Traceback (most recent call last)
 in ()
> 1 import graphframes

ImportError: No module named graphframes



Thank you in advance for any hint on how to import graphframes successfully.

Best regards,
Camelia


Re: Spark Streaming, Broadcast variables, java.lang.ClassCastException

2016-04-25 Thread mwol
I forgot the 

streamingContext.start()
streamingContext.awaitTermination()

in my example code, but the error stays the same...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Broadcast-variables-java-lang-ClassCastException-tp26828p26829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming, Broadcast variables, java.lang.ClassCastException

2016-04-25 Thread mwol
Hi,
I try to read data from a static textfile stored in HDFS, store its content
into an ArrayBuffer which in turn should be broadcasted via
sparkContext.broadcast as a BroadcastVariable. I am using cloudera's spark,
version 1.6.0-cdh5.7.0 and spark-streaming_2.10.

I start the application on yarn using spark-submit:
*spark-submit --class my.package.BroadcastStreamTest1 --master yarn
--deploy-mode client --conf spark.executor.userClassPathFirst=true
current.jar*

When I do this, I get an 
*java.lang.ClassCastException: cannot assign instance of scala.Some to field
org.apache.spark.Accumulable.name of type scala.Option in instance of
org.apache.spark.Accumulator*
The same code used with a hard coded ArrayBuffer work perfectly so I assume
it has something to do with the static file resource...
Does anyone have an idea what I am possibly doing wrong? Any help
appreciated.

Here's my code:

*This does not work:*
object BroadcastStreamTest1 {

def main(args: Array[String]) {
val sparkConf = new SparkConf()
val streamingContext = new StreamingContext(sparkConf, batchDuration
= Seconds(10))

val content = streamingContext.sparkContext
.textFile("hdfs:///data/someTextFile.txt")
.collect()
.toBuffer[String]

val broadCastVar = streamingContext.sparkContext.broadcast(content)
broadCastVar.value.foreach(line => println(line))
}
}

*This works:*
object BroadcastStreamTest2 {

def main(args: Array[String]) {
val sparkConf = new SparkConf()
val streamingContext = new StreamingContext(sparkConf, batchDuration
= Seconds(10))

val content = new mutable.ArrayBuffer[String]
(1 to 50).foreach(i => content += "line" + i)

val broadCastVar = streamingContext.sparkContext.broadcast(content)
broadCastVar.value.foreach(line => println(line))
}
}

*StackTrace:*
16/04/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed
4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint.net):
java.io.IOException: java.lang.ClassCastException: cannot assign instance of
scala.Some to field org.apache.spark.Accumulable.name of type scala.Option
in instance of org.apache.spark.Accumulator
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208)
at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of

Re: Spark Streaming Job get killed after running for about 1 hour

2016-04-25 Thread أنس الليثي
I am using the latest Spark version 1.6

I have increased the maximum number of open files using this command *sysctl
-w fs.file-max=3275782*

Also I increased the limit for the user who run the spark job by updating
the /etc/security/limits.conf file. Soft limit is 1024 and Hard limit
is 65536.

The operating system is Red Hat Enterprise Linux Server release 6.6
(Santiago)


@Rodrick : I will try to increase the assigned memory and see

Best regards


On 24 April 2016 at 16:42, Ted Yu  wrote:

> Which version of Spark are you using ?
>
> How did you increase the open file limit ?
>
> Which operating system do you use ?
>
> Please see Example 6. ulimit Settings on Ubuntu under:
> http://hbase.apache.org/book.html#basic.prerequisites
>
> On Sun, Apr 24, 2016 at 2:34 AM, fanooos  wrote:
>
>> I have a spark streaming job that read tweets stream from gnip and write
>> it
>> to Kafak.
>>
>> Spark and kafka are running on the same cluster.
>>
>> My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05
>>
>> Spark master is running on Kafak-b05.
>>
>> Here is how we submit the spark job
>>
>> *nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class
>> org.css.java.gnipStreaming.GnipSparkStreamer --master
>> spark://kafka-b05:7077
>> GnipStreamContainer.jar powertrack
>> kafka-b01.css.org,kafka-b02.css.org,kafka-b03.css.org,kafka-b04.css.org,
>> kafka-b05.css.org
>> gnip_live_stream 2 &*
>>
>> After about 1 hour the spark job get killed
>>
>> The logs in the nohub file shows the following exception
>>
>> /org.apache.spark.storage.BlockFetchException: Failed to fetch block from
>> 2
>> locations. Most recent failure cause:
>> at
>>
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>> at
>>
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
>> at
>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570)
>> at
>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:630)
>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: io.netty.channel.ChannelException: Unable to create Channel
>> from
>> class class io.netty.channel.socket.nio.NioSocketChannel
>> at
>>
>> io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455)
>> at
>>
>> io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
>> at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134)
>> at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116)
>> at
>>
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211)
>> at
>>
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>> at
>>
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>> at
>>
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>>
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> at
>>
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99)
>> at
>>
>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>> at
>>
>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
>> ... 15 more
>> Caused by: io.netty.channel.ChannelException: Failed to open a socket.
>> at
>>
>> io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62)
>> at
>>
>> io.netty.channel.socket.nio.NioSocketChannel.(NioSocketChannel.java:72)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> at
>>
>> 

Re: Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-25 Thread Praveen Devarao
Thanks Reynold for the reason as to why sortBykey invokes a Job

When you say "DataFrame/Dataset does not have this issue" is it right to 
assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in 
it?

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Reynold Xin 
To: Praveen Devarao/India/IBM@IBMIN
Cc: "d...@spark.apache.org" , user 

Date:   25/04/2016 11:26 am
Subject:Re: Do transformation functions on RDD invoke a Job 
[sc.runJob]?



Usually no - but sortByKey does because it needs the range boundary to be 
built in order to have the RDD. It is a long standing problem that's 
unfortunately very difficult to solve without breaking the RDD API.

In DataFrame/Dataset we don't have this issue though.


On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao  
wrote:
Hi,

I have a streaming program with the block as below [ref: 
https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
]

1 val lines = messages.map(_._2)
2 val hashTags = lines.flatMap(status => status.split(" "
).filter(_.startsWith("#")))

3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ )
3a .map { case (topic, count) => (count, topic) }
3b .transform(_.sortByKey(false))

4atopCounts60.foreachRDD( rdd => {
4b val topList = rdd.take( 10 )
})

This batch is triggering 2 jobs...one at line 3b(sortByKey)  and 
the other at 4b (rdd.take) I agree that there is a Job triggered on line 
4b as take() is an action on RDD while as on line 3b sortByKey is just a 
transformation function which as per docs is lazy evaluation...but I see 
that this line uses a RangePartitioner and Rangepartitioner on 
initialization invokes a method called sketch() that invokes collect() 
triggering a Job.

My question: Is it expected that sortByKey will invoke a Job...if 
yes, why is sortByKey listed as a transformation and not action. Are there 
any other functions like this that invoke a Job, though they are 
transformations and not actions?

I am on Spark 1.6

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"






Do transformation functions on RDD invoke a Job [sc.runJob]?

2016-04-25 Thread Praveen Devarao
Hi,

I have a streaming program with the block as below [ref: 
https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala
]

1 val lines = messages.map(_._2)
2 val hashTags = lines.flatMap(status => status.split(" "
).filter(_.startsWith("#")))

3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ )
3a  .map { case (topic, count) => (count, topic) }
3b  .transform(_.sortByKey(false))

4a topCounts60.foreachRDD( rdd => {
4b  val topList = rdd.take( 10 )
})

This batch is triggering 2 jobs...one at line 3b (sortByKey)  and 
the other at 4b (rdd.take) I agree that there is a Job triggered on line 
4b as take() is an action on RDD while as on line 3b sortByKey is just a 
transformation function which as per docs is lazy evaluation...but I see 
that this line uses a RangePartitioner and Rangepartitioner on 
initialization invokes a method called sketch() that invokes collect() 
triggering a Job.

My question: Is it expected that sortByKey will invoke a Job...if 
yes, why is sortByKey listed as a transformation and not action. Are there 
any other functions like this that invoke a Job, though they are 
transformations and not actions?

I am on Spark 1.6

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"