Re: Maelstrom: Kafka integration with Spark

2016-08-23 Thread Jeoffrey Lim
Apologies, I was not aware that Spark 2.0 has Kafka Consumer
caching/pooling now.
What I have checked is the latest Kafka Consumer, and I believe it is still
in beta quality.

https://kafka.apache.org/documentation.html#newconsumerconfigs

> Since 0.9.0.0 we have been working on a replacement for our existing
simple and high-level consumers.
> The code is considered beta quality.

Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
this one? Is it now stable?
With this caching feature in Spark 2,.0 could it achieve sub-milliseconds
stream processing now?


Maelstrom still uses the old Kafka Simple Consumer, this library was made
open source so that I
could continue working on it for future updates & improvements like when
the latest Kafka Consumer
gets a stable release.

We have been using Maelstrom "caching concept" for a long time now, as
Receiver based Spark Kafka integration
does not work for us. There were thoughts about using Direct Kafka APIs,
however Maelstrom has
very simple APIs and just "simply works" even under unstable scenarios
(e.g. advertised hostname failures on EMR).

Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and of
course with the latest Kafka 0.10 as well)


On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger  wrote:

> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
> kafka consumer instances on the executors?
>
> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim  wrote:
> > Hi,
> >
> > I have released the first version of a new Kafka integration with Spark
> > that we use in the company I work for: open sourced and named Maelstrom.
> >
> > It is unique compared to other solutions out there as it reuses the
> > Kafka Consumer connection to achieve sub-milliseconds latency.
> >
> > This library has been running stable in production environment and has
> > been proven to be resilient to numerous production issues.
> >
> >
> > Please check out the project's page in github:
> >
> > https://github.com/jeoffreylim/maelstrom
> >
> >
> > Contributors welcome!
> >
> >
> > Cheers!
> >
> > Jeoffrey Lim
> >
> >
> > P.S. I am also looking for a job opportunity, please look me up at
> Linked In
>


Re: Apply ML to grouped dataframe

2016-08-23 Thread Wen Pei Yu

Thank you Ayan.

For example, I have a dataframe below. And consider column "group" as key
to split this dataframe to three part, then want use kmeans to each split
part. To get each group's kmeans result.

+---+-++
| userID|group|features|
+---+-++
|12462563356|1|  [5.0,43.0]|
|12462563701|2|   [1.0,8.0]|
|12462563701|1|  [2.0,12.0]|
|12462564356|1|   [1.0,1.0]|
|12462565487|3|   [2.0,3.0]|
|12462565698|2|   [1.0,1.0]|
|12462565698|1|   [1.0,1.0]|
|12462566081|2|   [1.0,2.0]|
|12462566081|1|  [1.0,15.0]|
|12462566225|2|   [1.0,1.0]|
|12462566225|1|  [9.0,85.0]|
|12462566526|2|   [1.0,1.0]|
|12462566526|1|  [3.0,79.0]|
|12462567006|2| [11.0,15.0]|
|12462567006|1| [10.0,15.0]|
|12462567006|3| [10.0,15.0]|
|12462586595|2|  [2.0,42.0]|
|12462586595|3|  [2.0,16.0]|
|12462589343|3|   [1.0,1.0]|
+---+-++



From:   ayan guha 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: user , Nirmal Fernando 
Date:   08/23/2016 05:13 PM
Subject:Re: Apply ML to grouped dataframe



I would suggest you to construct a toy problem and post for solution. At
this moment it's a little unclear what your intentions are.


Generally speaking, group by on a data frame created another data frame,
not multiple ones.


On 23 Aug 2016 16:35, "Wen Pei Yu"  wrote:
  Hi Mirmal

  Filter works fine if I want handle one of grouped dataframe. But I has
  multiple grouped dataframe, I wish I can apply ML algorithm to all of
  them in one job, but not in for loops.

  Wenpei.

  Inactive hide details for Nirmal Fernando ---08/23/2016 01:55:46 PM---On
  Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu  wrote: > We can group a dataframe b

  From: Nirmal Fernando 
  To: Wen Pei Yu/China/IBM@IBMCN
  Cc: User 
  Date: 08/23/2016 01:55 PM
  Subject: Re: Apply ML to grouped dataframe





  On Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu  wrote:
We can group a dataframe by one column like

df.groupBy(df.col("gender"))


  On top of this DF, use a filter that would enable you to extract the
  grouped DF as separated DFs. Then you can apply ML on top of each DF.

  eg: xyzDF.filter(col("x").equalTo(x))

It like split a dataframe to multiple dataframe. Currently, we can
only apply simple sql function to this GroupedData like agg, max
etc.

What we want is apply one ML algorithm to each group.

Regards.

Inactive hide details for Nirmal Fernando ---08/23/2016 01:14:48
PM---Hi Wen, AFAIK Spark MLlib implements its machine learning
Nirmal Fernando ---08/23/2016 01:14:48 PM---Hi Wen, AFAIK Spark
MLlib implements its machine learning algorithms on top of

From: Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date: 08/23/2016 01:14 PM



Subject: Re: Apply ML to grouped dataframe



Hi Wen,

AFAIK Spark MLlib implements its machine learning algorithms on top
of Spark dataframe API. What did you mean by a grouped dataframe?

On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu 
wrote:
Hi Nirmal

I didn't get your point.
Can you tell me more about how to use MLlib to grouped
dataframe?

Regards.
Wenpei.

Inactive hide details for Nirmal Fernando ---08/23/2016
10:26:36 AM---You can use Spark MLlib
http://spark.apache.org/docs/lateNirmal Fernando
---08/23/2016 10:26:36 AM---You can use Spark MLlib

http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-bas


From: Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date: 08/23/2016 10:26 AM
Subject: Re: Apply ML to grouped dataframe




You can use Spark MLlib

http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api


On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu <
yuw...@cn.ibm.com> wrote:
Hi

We have a dataframe, then want
group it and apply a ML
algorithm or statistics(say t
  

Re: Spark 2.0 with Kafka 0.10 exception

2016-08-23 Thread Cody Koeninger
You can set that poll timeout higher with

spark.streaming.kafka.consumer.poll.ms

but half a second is fairly generous.  I'd try to take a look at
what's going on with your network or kafka broker during that time.

On Tue, Aug 23, 2016 at 4:44 PM, Srikanth  wrote:
> Hello,
>
> I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
>
>> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> spark-executor-example mt_event 0 15782114
>> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
>> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> spark-executor-example.
>> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
>> 6)
>> java.lang.AssertionError: assertion failed: Failed to get records for
>> spark-executor-example mt_event 0 15782114 after polling for 512
>> at scala.Predef$.assert(Predef.scala:170)
>> at
>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> at
>> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>
>
> I get this error intermittently. Sometimes a few batches are scheduled and
> run fine. Then I get this error.
> kafkacat is able to fetch from this topic continuously.
>
> Full exception is here --
> https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>
> Srikanth

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Maelstrom: Kafka integration with Spark

2016-08-23 Thread Cody Koeninger
Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
kafka consumer instances on the executors?

On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim  wrote:
> Hi,
>
> I have released the first version of a new Kafka integration with Spark
> that we use in the company I work for: open sourced and named Maelstrom.
>
> It is unique compared to other solutions out there as it reuses the
> Kafka Consumer connection to achieve sub-milliseconds latency.
>
> This library has been running stable in production environment and has
> been proven to be resilient to numerous production issues.
>
>
> Please check out the project's page in github:
>
> https://github.com/jeoffreylim/maelstrom
>
>
> Contributors welcome!
>
>
> Cheers!
>
> Jeoffrey Lim
>
>
> P.S. I am also looking for a job opportunity, please look me up at Linked In

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How Spark HA works

2016-08-23 Thread Mohit Jaggi
what did you mean by “link” ? an HTTP URL to the spark monitoring UI? AFAIK, it 
is not directly supported. i typically go to both masters and check which one 
is active :-)

did you check if the failover actually happened in other ways (i don’t know 
what the znode should say)? you can try sending a spark job and if you used the 
right master config in your code, it should go to the new master. that will 
confirm that failover worked.



Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 19, 2016, at 8:56 PM, Charles Nnamdi Akalugwu  
> wrote:
> 
> I am experiencing this exact issue. Does anyone know what's going on with the 
> zookeeper setup?
> 
> On Jul 5, 2016 10:34 AM, "Akmal Abbasov"  > wrote:
> >
> > Hi, 
> > I'm trying to understand how Spark HA works. I'm using Spark 1.6.1 and 
> > Zookeeper 3.4.6.
> > I've add the following line to $SPARK_HOME/conf/spark-env.sh
> > export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
> > -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181,zk3:2181 
> > -Dspark.deploy.zookeeper.dir=/spark
> > It's working so far.
> > I'd like to setup a link which will always go to active master UI(I'm using 
> > Spark in Standalone).  
> > I've checked the znode /spark, and it contains 
> > [leader_election, master_status]
> > I'm assuming that master_status znode will contain ip address of the 
> > current active master, is it true? Because in my case this znode isn't 
> > updated after failover.
> > And how /spark/leader_election works, because it doesn't contain any data.
> > Thank you.
> >
> > Regards,
> > Akmal
> >
> >
> 



Re: Using spark to distribute jobs to standalone servers

2016-08-23 Thread Mohit Jaggi
It is a bit hacky but possible. A lot depends on what kind of queries etc you 
want to run. You could write a data source that reads your data and keeps it 
partitioned the way you want, then use mapPartitions() to execute your code…


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Aug 22, 2016, at 7:59 AM, Larry White  wrote:
> 
> Hi,
> 
> I have a bit of an unusual use-case and would greatly appreciate some 
> feedback as to whether it is a good fit for spark.
> 
> I have a network of compute/data servers configured as a tree as shown below
> controller
> server 1
> server 2
> server 3
> etc.
> There are ~20 servers, but the number is increasing to ~100. 
> 
> Each server contains a different dataset, all in the same format. Each is 
> hosted by a different organization, and the data on every individual server 
> is unique to that organization.
> 
> Data cannot be replicated across servers using RDDs or any other means, for 
> privacy/ownership reasons.
> 
> Data cannot be retrieved to the controller, except in aggregate form, as the 
> result of a query, for example. 
> 
> Because of this, there are currently no operations that treats the data as if 
> it were a single data set: We could run a classifier on each site 
> individually, but cannot for legal reasons, pull all the data into a single 
> physical dataframe to run the classifier on all of it together. 
> 
> The servers are located across a wide geographic region (1,000s of miles)
> 
> We would like to send jobs from the controller to be executed in parallel on 
> all the servers, and retrieve the results to the controller. The jobs would 
> consist of SQL-Heavy Java code for 'production' queries, and python or R code 
> for ad-hoc queries and predictive modeling. 
> 
> Spark seems to have the capability to meet many of the individual 
> requirements, but is it a reasonable platform overall for building this 
> application?
> 
> Thank you very much for your assistance. 
> 
> Larry 
>  



Re: Spark with Parquet

2016-08-23 Thread Mohit Jaggi
something like this should work….

val df = sparkSession.read.csv(“myfile.csv”) //you may have to provide a schema 
if the guessed schema is not accurate
df.write.parquet(“myfile.parquet”)


Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




> On Apr 27, 2014, at 11:41 PM, Sai Prasanna  wrote:
> 
> Hi All,
> 
> I want to store a csv-text file in Parquet format in HDFS and then do some 
> processing in Spark.
> 
> Somehow my search to find the way to do was futile. More help was available 
> for parquet with impala. 
> 
> Any guidance here? Thanks !!
> 



Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
@RK yeah I am thinking perhaps it is a better question to the @dev group. but
from the files that I pointed out the code and the comments that are in those
files I would be more inclined to think that it is actually storing byte code.





On Tue, Aug 23, 2016 4:37 PM, RK Aduri rkad...@collectivei.com wrote:
Can you come up with your complete analysis? A snapshot of what you think the
code is doing. May be that would help us understand what exactly you were trying
to convey.

On Aug 23, 2016, at 4:21 PM, kant kodali < kanth...@gmail.com > wrote:

apache/spark spark - Mirror of Apache Spark GITHUB.COM






On Tue, Aug 23, 2016 4:17 PM, kant kodali kanth...@gmail.com wrote:
@RK you may want to look more deeply if you are curious. the code starts from
here

apache/spark spark - Mirror of Apache Spark GITHUB.COM

and it goes here where it is trying to save the python code object(which is a
byte code)

apache/spark spark - Mirror of Apache Spark GITHUB.COM






On Tue, Aug 23, 2016 2:39 PM, RK Aduri rkad...@collectivei.com wrote:
I just had a glance. AFAIK, that is nothing do with RDDs. It’s a pickler used to
serialize and deserialize the python code.
On Aug 23, 2016, at 2:23 PM, kant kodali < kanth...@gmail.com > wrote:
@Sean
well this makes sense but I wonder what the following source code is doing?

apache/spark spark - Mirror of Apache Spark GITHUB.COM

This code looks like it is trying to store some byte code some where (whether
its memory or disk) but why even go this path like creating a code objects so it
can be executed later and so on after all we are trying to persist the result of
computing the RDD" ?





On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com wrote:
We're probably mixing up some semantics here. An RDD is indeed,

really, just some bookkeeping that records how a certain result is

computed. It is not the data itself.




However we often talk about "persisting an RDD" which means

"persisting the result of computing the RDD" in which case that

persisted representation can be used instead of recomputing it.




The result of computing an RDD is really some objects in memory. It's

possible to persist the RDD in memory by just storing these objects in

memory as cached partitions. This involves no serialization.




Data can be persisted to disk but this involves serializing objects to

bytes (not byte code). It's also possible to store a serialized

representation in memory because it may be more compact.




This is not the same as saving/writing an RDD to persistent storage as

text or JSON or whatever.




On Tue, Aug 23, 2016 at 9:28 PM, kant kodali < kanth...@gmail.com > wrote:

> @srkanth are you sure? the whole point of RDD's is to store transformations

> but not the data as the spark paper points out but I do lack the practical

> experience for me to confirm. when I looked at the spark source

> code(specifically the checkpoint code) a while ago it was clearly storing

> some JVM byte code to disk which I thought were the transformations.

>

>

>

> On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:

>>

>> RDD contains data but not JVM byte code i.e. data which is read from

>> source and transformations have been applied. This is ideal case to persist

>> RDDs.. As Nirav mentioned this data will be serialized before persisting to

>> disk..

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 3:59 PM

>> To: Nirav

> srikanth.je...@gmail.com ; user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

>>

>> You can store either in serialized form(butter array) or just save it in a

>> string format like tsv or csv. There are different RDD save apis for that.

>>

>> Sent from my iPhone

>>

>>

>> On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

>>

>> ok now that I understand RDD can be stored to the disk. My last question

>> on this topic would be this.

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

>>

>> On an other note, if you have a streaming app, you checkpoint the RDDs so

>> that they can be accessed in case of a failure. And yes, RDDs are persisted

>> to DISK. You can access spark’s UI and see it listed under Storage tab.

>>

>>

>>

>> If RDDs are persisted in memory, you avoid any disk I/Os so that any

>> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -

>> available in Spark UI )

>>

>>

>>

>> On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com >

>> < srikanth.je...@gmail.com > wrote:

>>

>>

>>

>> RAM or Virtual memory is 

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread RK Aduri
Can you come up with your complete analysis? A snapshot of what you think the 
code is doing. May be that would help us understand what exactly you were 
trying to convey.


> On Aug 23, 2016, at 4:21 PM, kant kodali  wrote:
> 
> 
>   
> apache/spark
>  
> spark - Mirror of Apache Spark
>  
> GITHUB.COM
>   
>  
> 
> 
> 
> 
> On Tue, Aug 23, 2016 4:17 PM, kant kodali kanth...@gmail.com 
>  wrote:
> @RK you may want to look more deeply if you are curious. the code starts from 
> here 
> 
> 
>   
> apache/spark
>  
> spark - Mirror of Apache Spark
>  
> GITHUB.COM
>   
>  
> 
> 
> and it goes here where it is trying to save the python code object(which is a 
> byte code)
> 
> 
>  
> 
>  
> apache/spark
>  
> 
> spark - Mirror of Apache Spark
>  
> 
> GITHUB.COM
>  
> 
>   
> 
> 
> 
> 
> On Tue, Aug 23, 2016 2:39 PM, RK Aduri rkad...@collectivei.com 
>  wrote:
> I just had a glance. AFAIK, that is nothing do with RDDs. It’s a pickler used 
> to serialize and deserialize the python code.
> 
>> On Aug 23, 2016, at 2:23 PM, kant kodali > > wrote:
>> 
>> @Sean 
>> 
>> well this makes sense but I wonder what the following source code is doing?
>> 
>> 
>>  
>> 
>> 
>> apache/spark
>>  
>> 
>> spark - Mirror of Apache Spark
>>  
>> 
>> GITHUB.COM
>>  
>> 
>>  
>> 
>> 
>> This code looks like it is trying to store some byte code some where 
>> (whether its memory or disk) but why even go this path like creating a code 
>> objects so it can be executed later and so on after all we are trying to 
>> persist the result of computing the RDD" ?
>> 
>> 
>> 
>> 
>> 
>> On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com 
>>  wrote:
>> We're probably mixing up some semantics here. An RDD is indeed,
>> 
>> really, just some bookkeeping that records how a certain result is
>> 
>> computed. It is not the data itself.
>> 
>> 
>> 
>> However we often talk about "persisting an RDD" which means
>> 
>> "persisting the result of computing the RDD" in which case that
>> 
>> persisted representation can be used instead of recomputing it.
>> 
>> 
>> 
>> The result of computing an RDD is really some objects in memory. It's
>> 
>> possible to persist the RDD in memory by just storing these objects in
>> 
>> memory as cached partitions. This involves no serialization.
>> 
>> 
>> 
>> Data can be persisted to disk but this involves serializing objects to
>> 
>> bytes (not byte code). It's also possible to store a serialized
>> 
>> representation in memory because it may be more compact.
>> 
>> 
>> 
>> This is not the same as saving/writing an RDD to persistent storage as
>> 
>> text or JSON or whatever.
>> 
>> 
>> 
>> On Tue, Aug 23, 2016 at 9:28 PM, kant kodali > > wrote:
>> 
>> > @srkanth are you sure? the whole point of RDD's is to store transformations
>> 
>> > but not the data as the spark paper points out but I do lack the practical
>> 
>> > experience for me to confirm. when I looked at the spark source
>> 
>> > code(specifically the checkpoint code) a while ago it was clearly storing
>> 
>> > some JVM byte code to disk which I thought were the transformations.
>> 
>> >
>> 
>> >
>> 
>> >
>> 
>> > On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com 
>> >  wrote:
>> 
>> >>
>> 
>> >> RDD contains data but not JVM byte code i.e. data which is read from
>> 
>> >> source and transformations have been applied. This is 

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
apache/spark spark - Mirror of Apache Spark github.com






On Tue, Aug 23, 2016 4:17 PM, kant kodali kanth...@gmail.com wrote:
@RK you may want to look more deeply if you are curious. the code starts from
here

apache/spark spark - Mirror of Apache Spark github.com

and it goes here where it is trying to save the python code object(which is a
byte code)

apache/spark spark - Mirror of Apache Spark github.com






On Tue, Aug 23, 2016 2:39 PM, RK Aduri rkad...@collectivei.com wrote:
I just had a glance. AFAIK, that is nothing do with RDDs. It’s a pickler used to
serialize and deserialize the python code.
On Aug 23, 2016, at 2:23 PM, kant kodali < kanth...@gmail.com > wrote:
@Sean
well this makes sense but I wonder what the following source code is doing?

apache/spark spark - Mirror of Apache Spark GITHUB.COM

This code looks like it is trying to store some byte code some where (whether
its memory or disk) but why even go this path like creating a code objects so it
can be executed later and so on after all we are trying to persist the result of
computing the RDD" ?





On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com wrote:
We're probably mixing up some semantics here. An RDD is indeed,

really, just some bookkeeping that records how a certain result is

computed. It is not the data itself.




However we often talk about "persisting an RDD" which means

"persisting the result of computing the RDD" in which case that

persisted representation can be used instead of recomputing it.




The result of computing an RDD is really some objects in memory. It's

possible to persist the RDD in memory by just storing these objects in

memory as cached partitions. This involves no serialization.




Data can be persisted to disk but this involves serializing objects to

bytes (not byte code). It's also possible to store a serialized

representation in memory because it may be more compact.




This is not the same as saving/writing an RDD to persistent storage as

text or JSON or whatever.




On Tue, Aug 23, 2016 at 9:28 PM, kant kodali < kanth...@gmail.com > wrote:

> @srkanth are you sure? the whole point of RDD's is to store transformations

> but not the data as the spark paper points out but I do lack the practical

> experience for me to confirm. when I looked at the spark source

> code(specifically the checkpoint code) a while ago it was clearly storing

> some JVM byte code to disk which I thought were the transformations.

>

>

>

> On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:

>>

>> RDD contains data but not JVM byte code i.e. data which is read from

>> source and transformations have been applied. This is ideal case to persist

>> RDDs.. As Nirav mentioned this data will be serialized before persisting to

>> disk..

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 3:59 PM

>> To: Nirav

> srikanth.je...@gmail.com ; user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

>>

>> You can store either in serialized form(butter array) or just save it in a

>> string format like tsv or csv. There are different RDD save apis for that.

>>

>> Sent from my iPhone

>>

>>

>> On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

>>

>> ok now that I understand RDD can be stored to the disk. My last question

>> on this topic would be this.

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

>>

>> On an other note, if you have a streaming app, you checkpoint the RDDs so

>> that they can be accessed in case of a failure. And yes, RDDs are persisted

>> to DISK. You can access spark’s UI and see it listed under Storage tab.

>>

>>

>>

>> If RDDs are persisted in memory, you avoid any disk I/Os so that any

>> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -

>> available in Spark UI )

>>

>>

>>

>> On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com >

>> < srikanth.je...@gmail.com > wrote:

>>

>>

>>

>> RAM or Virtual memory is finite, so data size needs to be considered

>> before persist. Please see below documentation when to choose the

>> persistency level.

>>

>>

>>

>>

>> 
http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 2:42 PM

>> To: srikanth.je...@gmail.com

>> Cc: user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> so when do we ever need to persist RDD on disk? given 

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
@RK you may want to look more deeply if you are curious. the code starts from
here

apache/spark spark - Mirror of Apache Spark github.com

and it goes here where it is trying to save the python code object(which is a
byte code)

apache/spark spark - Mirror of Apache Spark github.com






On Tue, Aug 23, 2016 2:39 PM, RK Aduri rkad...@collectivei.com wrote:
I just had a glance. AFAIK, that is nothing do with RDDs. It’s a pickler used to
serialize and deserialize the python code.
On Aug 23, 2016, at 2:23 PM, kant kodali < kanth...@gmail.com > wrote:
@Sean
well this makes sense but I wonder what the following source code is doing?

apache/spark spark - Mirror of Apache Spark GITHUB.COM

This code looks like it is trying to store some byte code some where (whether
its memory or disk) but why even go this path like creating a code objects so it
can be executed later and so on after all we are trying to persist the result of
computing the RDD" ?





On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com wrote:
We're probably mixing up some semantics here. An RDD is indeed,

really, just some bookkeeping that records how a certain result is

computed. It is not the data itself.




However we often talk about "persisting an RDD" which means

"persisting the result of computing the RDD" in which case that

persisted representation can be used instead of recomputing it.




The result of computing an RDD is really some objects in memory. It's

possible to persist the RDD in memory by just storing these objects in

memory as cached partitions. This involves no serialization.




Data can be persisted to disk but this involves serializing objects to

bytes (not byte code). It's also possible to store a serialized

representation in memory because it may be more compact.




This is not the same as saving/writing an RDD to persistent storage as

text or JSON or whatever.




On Tue, Aug 23, 2016 at 9:28 PM, kant kodali < kanth...@gmail.com > wrote:

> @srkanth are you sure? the whole point of RDD's is to store transformations

> but not the data as the spark paper points out but I do lack the practical

> experience for me to confirm. when I looked at the spark source

> code(specifically the checkpoint code) a while ago it was clearly storing

> some JVM byte code to disk which I thought were the transformations.

>

>

>

> On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:

>>

>> RDD contains data but not JVM byte code i.e. data which is read from

>> source and transformations have been applied. This is ideal case to persist

>> RDDs.. As Nirav mentioned this data will be serialized before persisting to

>> disk..

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 3:59 PM

>> To: Nirav

> srikanth.je...@gmail.com ; user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

>>

>> You can store either in serialized form(butter array) or just save it in a

>> string format like tsv or csv. There are different RDD save apis for that.

>>

>> Sent from my iPhone

>>

>>

>> On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

>>

>> ok now that I understand RDD can be stored to the disk. My last question

>> on this topic would be this.

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

>>

>> On an other note, if you have a streaming app, you checkpoint the RDDs so

>> that they can be accessed in case of a failure. And yes, RDDs are persisted

>> to DISK. You can access spark’s UI and see it listed under Storage tab.

>>

>>

>>

>> If RDDs are persisted in memory, you avoid any disk I/Os so that any

>> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -

>> available in Spark UI )

>>

>>

>>

>> On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com >

>> < srikanth.je...@gmail.com > wrote:

>>

>>

>>

>> RAM or Virtual memory is finite, so data size needs to be considered

>> before persist. Please see below documentation when to choose the

>> persistency level.

>>

>>

>>

>>

>> 
http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 2:42 PM

>> To: srikanth.je...@gmail.com

>> Cc: user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> so when do we ever need to persist RDD on disk? given that we don't need

>> to worry about RAM(memory) as virtual memory will just push pages to the

>> disk when memory becomes 

Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Matei Zaharia
I think people explained this pretty well, but in practice, this distinction is 
also somewhat of a marketing term, because every system will perform some kind 
of batching. For example, every time you use TCP, the OS and network stack may 
buffer multiple messages together and send them at once; and likewise, 
virtually all streaming engines can batch data internally to achieve higher 
throughput. Furthermore, in all APIs, you can see individual records and 
respond to them one by one. The main question is just what overall performance 
you get (throughput and latency).

Matei

> On Aug 23, 2016, at 4:08 PM, Aseem Bansal  wrote:
> 
> Thanks everyone for clarifying.
> 
> On Tue, Aug 23, 2016 at 9:11 PM, Aseem Bansal  > wrote:
> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/ 
>  and it mentioned that spark 
> streaming actually mini-batch not actual streaming. 
> 
> I have not used streaming and I am not sure what is the difference in the 2 
> terms. Hence could not make a judgement myself.
> 



Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Aseem Bansal
Thanks everyone for clarifying.

On Tue, Aug 23, 2016 at 9:11 PM, Aseem Bansal  wrote:

> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/
> and it mentioned that spark streaming actually mini-batch not actual
> streaming.
>
> I have not used streaming and I am not sure what is the difference in the
> 2 terms. Hence could not make a judgement myself.
>


DataFrame Data Manipulation - Based on a timestamp column Not Working

2016-08-23 Thread Subhajit Purkayastha
Using spark 2.0  & scala 2.11.8, I have a DataFrame with a timestamp column

 

root

|-- ORG_ID: integer (nullable = true)

|-- HEADER_ID: integer (nullable = true)

|-- ORDER_NUMBER: integer (nullable = true)

|-- LINE_ID: integer (nullable = true)

|-- LINE_NUMBER: integer (nullable = true)

|-- ITEM_TYPE_CODE: string (nullable = true)

|-- ORGANIZATION_ID: integer (nullable = true)

|-- INVENTORY_ITEM_ID: integer (nullable = true)

|-- SCHEDULE_SHIP_DATE: timestamp (nullable = true)

|-- ORDER_QUANTITY_UOM: string (nullable = true)

|-- UNIT_SELLING_PRICE: double (nullable = true)

|-- OPEN_QUANTITY: double (nullable = true)

 

[204,94468,56721,197328,1,STANDARD,207,149,2004-01-08
23:59:59.0,Ea,1599.0,28.0]

[204,94468,56721,197331,2,STANDARD,207,151,2004-01-08
23:59:59.0,Ea,1899.05,40.0]

[204,94468,56721,197332,3,STANDARD,207,436,2004-01-08
23:59:59.0,Ea,300.0,24.0]

[204,94468,56721,197335,4,STANDARD,207,3751,2004-01-08
23:59:59.0,Ea,380.0,24.0]

 

I want to manipulate the dataframe data based on a parameter =
demand_time_fence_date

 

var demand_timefence_end_date_instance = new
MutableDateTime(planning_start_date)

var demand_timefence_days =
demand_timefence_end_date_instance.addDays(demand_time_fence)

val demand_timefence_end_date =
ISODateTimeFormat.yearMonthDay().print(demand_timefence_end_date_instance)

 

var filter_stmt = "from_unixtime(SCHEDULE_SHIP_DATE,'-MM-dd') >= "+
demand_timefence_end_date  

 

val sales_order_dataFrame =
sales_order_base_dataFrame.filter(filter_stmt).limit(10)

 

What is the correct syntax to pass the parameter value? 

 

The above filter statement is not working to restrict the dataset

 

Thanks,

 

Subhajit

 

 



How do we process/scale variable size batches in Apache Spark Streaming

2016-08-23 Thread Rachana Srivastava
I am running a spark streaming process where I am getting batch of data after n 
seconds. I am using repartition to scale the application. Since the repartition 
size is fixed we are getting lots of small files when batch size is very small. 
Is there anyway I can change the partitioner logic based on the input batch 
size in order to avoid lots of small files.


Re: Are RDD's ever persisted to disk?

2016-08-23 Thread RK Aduri
I just had a glance. AFAIK, that is nothing do with RDDs. It’s a pickler used 
to serialize and deserialize the python code.

> On Aug 23, 2016, at 2:23 PM, kant kodali  wrote:
> 
> @Sean 
> 
> well this makes sense but I wonder what the following source code is doing?
> 
> 
>  
> 
>  
> apache/spark
>  
> 
> spark - Mirror of Apache Spark
>  
> 
> GITHUB.COM
>  
> 
>   
> 
> 
> This code looks like it is trying to store some byte code some where (whether 
> its memory or disk) but why even go this path like creating a code objects so 
> it can be executed later and so on after all we are trying to persist the 
> result of computing the RDD" ?
> 
> 
> 
> 
> 
> On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com 
>  wrote:
> We're probably mixing up some semantics here. An RDD is indeed,
> 
> really, just some bookkeeping that records how a certain result is
> 
> computed. It is not the data itself.
> 
> 
> 
> However we often talk about "persisting an RDD" which means
> 
> "persisting the result of computing the RDD" in which case that
> 
> persisted representation can be used instead of recomputing it.
> 
> 
> 
> The result of computing an RDD is really some objects in memory. It's
> 
> possible to persist the RDD in memory by just storing these objects in
> 
> memory as cached partitions. This involves no serialization.
> 
> 
> 
> Data can be persisted to disk but this involves serializing objects to
> 
> bytes (not byte code). It's also possible to store a serialized
> 
> representation in memory because it may be more compact.
> 
> 
> 
> This is not the same as saving/writing an RDD to persistent storage as
> 
> text or JSON or whatever.
> 
> 
> 
> On Tue, Aug 23, 2016 at 9:28 PM, kant kodali  wrote:
> 
> > @srkanth are you sure? the whole point of RDD's is to store transformations
> 
> > but not the data as the spark paper points out but I do lack the practical
> 
> > experience for me to confirm. when I looked at the spark source
> 
> > code(specifically the checkpoint code) a while ago it was clearly storing
> 
> > some JVM byte code to disk which I thought were the transformations.
> 
> >
> 
> >
> 
> >
> 
> > On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:
> 
> >>
> 
> >> RDD contains data but not JVM byte code i.e. data which is read from
> 
> >> source and transformations have been applied. This is ideal case to persist
> 
> >> RDDs.. As Nirav mentioned this data will be serialized before persisting to
> 
> >> disk..
> 
> >>
> 
> >>
> 
> >>
> 
> >> Thanks,
> 
> >> Sreekanth Jella
> 
> >>
> 
> >>
> 
> >>
> 
> >> From: kant kodali
> 
> >> Sent: Tuesday, August 23, 2016 3:59 PM
> 
> >> To: Nirav
> 
> >> Cc: RK Aduri; srikanth.je...@gmail.com; user@spark.apache.org
> 
> >> Subject: Re: Are RDD's ever persisted to disk?
> 
> >>
> 
> >>
> 
> >>
> 
> >> Storing RDD to disk is nothing but storing JVM byte code to disk (in case
> 
> >> of Java or Scala). am I correct?
> 
> >>
> 
> >>
> 
> >>
> 
> >>
> 
> >>
> 
> >> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:
> 
> >>
> 
> >> You can store either in serialized form(butter array) or just save it in a
> 
> >> string format like tsv or csv. There are different RDD save apis for that.
> 
> >>
> 
> >> Sent from my iPhone
> 
> >>
> 
> >>
> 
> >> On Aug 23, 2016, at 12:26 PM, kant kodali  wrote:
> 
> >>
> 
> >> ok now that I understand RDD can be stored to the disk. My last question
> 
> >> on this topic would be this.
> 
> >>
> 
> >>
> 
> >>
> 
> >> Storing RDD to disk is nothing but storing JVM byte code to disk (in case
> 
> >> of Java or Scala). am I correct?
> 
> >>
> 
> >>
> 
> >>
> 
> >>
> 
> >>
> 
> >> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:
> 
> >>
> 
> >> On an other note, if you have a streaming app, you checkpoint the RDDs so
> 
> >> that they can be accessed in case of a failure. And yes, RDDs are persisted
> 
> >> to DISK. You can access spark’s UI and see it listed under Storage tab.
> 
> >>
> 
> >>
> 
> >>
> 
> >> If RDDs are persisted in memory, you avoid any disk I/Os so that any
> 
> >> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -
> 
> >> available in Spark UI )
> 
> >>
> 
> >>
> 
> >>
> 
> >> On Aug 23, 2016, at 12:10 PM, 
> 
> >>  wrote:
> 
> >>
> 
> >>
> 
> >>
> 
> >> RAM or Virtual memory is finite, so data size needs to be considered
> 
> >> before persist. Please see below documentation when to choose the
> 
> >> persistency level.
> 
> >>
> 
> >>

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
@Sean
well this makes sense but I wonder what the following source code is doing?

apache/spark spark - Mirror of Apache Spark github.com

This code looks like it is trying to store some byte code some where (whether
its memory or disk) but why even go this path like creating a code objects so it
can be executed later and so on after all we are trying to persist the result of
computing the RDD" ?





On Tue, Aug 23, 2016 1:42 PM, Sean Owen so...@cloudera.com wrote:
We're probably mixing up some semantics here. An RDD is indeed,

really, just some bookkeeping that records how a certain result is

computed. It is not the data itself.




However we often talk about "persisting an RDD" which means

"persisting the result of computing the RDD" in which case that

persisted representation can be used instead of recomputing it.




The result of computing an RDD is really some objects in memory. It's

possible to persist the RDD in memory by just storing these objects in

memory as cached partitions. This involves no serialization.




Data can be persisted to disk but this involves serializing objects to

bytes (not byte code). It's also possible to store a serialized

representation in memory because it may be more compact.




This is not the same as saving/writing an RDD to persistent storage as

text or JSON or whatever.




On Tue, Aug 23, 2016 at 9:28 PM, kant kodali  wrote:

> @srkanth are you sure? the whole point of RDD's is to store transformations

> but not the data as the spark paper points out but I do lack the practical

> experience for me to confirm. when I looked at the spark source

> code(specifically the checkpoint code) a while ago it was clearly storing

> some JVM byte code to disk which I thought were the transformations.

>

>

>

> On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:

>>

>> RDD contains data but not JVM byte code i.e. data which is read from

>> source and transformations have been applied. This is ideal case to persist

>> RDDs.. As Nirav mentioned this data will be serialized before persisting to

>> disk..

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 3:59 PM

>> To: Nirav

> user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

>>

>> You can store either in serialized form(butter array) or just save it in a

>> string format like tsv or csv. There are different RDD save apis for that.

>>

>> Sent from my iPhone

>>

>>

>> On Aug 23, 2016, at 12:26 PM, kant kodali  wrote:

>>

>> ok now that I understand RDD can be stored to the disk. My last question

>> on this topic would be this.

>>

>>

>>

>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case

>> of Java or Scala). am I correct?

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

>>

>> On an other note, if you have a streaming app, you checkpoint the RDDs so

>> that they can be accessed in case of a failure. And yes, RDDs are persisted

>> to DISK. You can access spark’s UI and see it listed under Storage tab.

>>

>>

>>

>> If RDDs are persisted in memory, you avoid any disk I/Os so that any

>> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -

>> available in Spark UI )

>>

>>

>>

>> On Aug 23, 2016, at 12:10 PM, 

>>  wrote:

>>

>>

>>

>> RAM or Virtual memory is finite, so data size needs to be considered

>> before persist. Please see below documentation when to choose the

>> persistency level.

>>

>>

>>

>>

>>
http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

>>

>>

>>

>> Thanks,

>> Sreekanth Jella

>>

>>

>>

>> From: kant kodali

>> Sent: Tuesday, August 23, 2016 2:42 PM

>> To: srikanth.je...@gmail.com

>> Cc: user@spark.apache.org

>> Subject: Re: Are RDD's ever persisted to disk?

>>

>>

>>

>> so when do we ever need to persist RDD on disk? given that we don't need

>> to worry about RAM(memory) as virtual memory will just push pages to the

>> disk when memory becomes scarce.

>>

>>

>>

>>

>>

>> On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:

>>

>> Hi Kant Kodali,

>>

>>

>>

>> Based on the input parameter to persist() method either it will be cached

>> on memory or persisted to disk. In case of failures Spark will reconstruct

>> the RDD on a different executor based on the DAG. That is how failures are

>> handled. Spark Core does not replicate the RDDs as they can be reconstructed

>> from the source (let’s say HDFS, Hive or S3 etc.) but not from memory (which

>> is lost already).

>>

>>

>>

>> Thanks,

>> Sreekanth Jella


Re: Breaking down text String into Array elements

2016-08-23 Thread Mich Talebzadeh
Thanks Nick, Sean and everyone. That did it

BTW I registered UDF for later use in a program

Anyway this is the much simplified code

import scala.util.Random
//
// UDF to create a random string of length characters
//
def randomString(chars: String, length: Int): String =
   (0 until length).map(_ => chars(Random.nextInt(chars.length))).mkString
spark.udf.register("randomString", randomString(_:String, _:Int))
case class columns (col1: Int, col2: String)
//val chars = ('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9') ++ ("-!£$")
val chars = ('a' to 'z') ++ ('A' to 'Z')
val text = (1 to 10).map(i => (i.toString, randomString(chars.mkString(""),
10))).toArray
val df = sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF()
df.show
sys.exit


And this is the result

Loading dynamic_ARRAY_generator.scala...
import scala.util.Random
randomString: (chars: String, length: Int)String
res0: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(StringType,
IntegerType)))
defined class columns
chars: scala.collection.immutable.IndexedSeq[Char] = Vector(a, b, c, d, e,
f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z, A, B, C, D,
E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z)
text: Array[(String, String)] = Array((1,KUQyYfnnlu), (2,uYRHdRvSOc),
(3,BmrUBiMOgY), (4,LbvcqCUcQt), (5,GJlJmWFHwc), (6,zLuhPtoHJH),
(7,oCQaoCkFHG), (8,wUghlvXvQF), (9,zCHhwMsvaw), (10,pQCYUJuFyt))
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
++--+
|col1|  col2|
++--+
|   1|KUQyYfnnlu|
|   2|uYRHdRvSOc|
|   3|BmrUBiMOgY|
|   4|LbvcqCUcQt|
|   5|GJlJmWFHwc|
|   6|zLuhPtoHJH|
|   7|oCQaoCkFHG|
|   8|wUghlvXvQF|
|   9|zCHhwMsvaw|
|  10|pQCYUJuFyt|
++--+


Cheers





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 23 August 2016 at 21:20, RK Aduri  wrote:

> That’s because of this:
>
> scala> val text = Array((1,"hNjLJEgjxn"),(2,"
> lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"
> LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"
> LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"))
> text: Array[(Int, String)] = Array((1,hNjLJEgjxn), (2,lgryHkVlCN),
> (3,ukswqcanVC), (4,ZFULVxzAsv), (5,LNzOozHZPF), (6,KZPYXTqMkY),
> (7,DVjpOvVJTw), (8,LKRYrrLrLh), (9,acheneIPDM), (10,iGZTrKfXNr))
>
> scala> Array(text).getClass()
> res1: Class[_ <: Array[Array[(Int, String)]]] = class [[Lscala.Tuple2;
>
> scala> Array(text).length
> res2: Int = 1
>
> You see that Array(text) is basically a single element.
>
>
> On Aug 23, 2016, at 12:26 PM, Nick Pentreath 
> wrote:
>
>
> How about something like
>>
>> scala> val text = (1 to 10).map(i => (i.toString,
>> random_string(chars.mkString(""), 10))).toArray
>>
>> text: Array[(String, String)] = Array((1,FBECDoOoAC), (2,wvAyZsMZnt),
>> (3,KgnwObOFEG), (4,tAZPRodrgP), (5,uSgrqyZGuc), (6,ztrTmbkOhO),
>> (7,qUbQsKtZWq), (8,JDokbiFzWy), (9,vNHgiHSuUM), (10,CmnFjlHnHx))
>>
>> scala> sc.parallelize(text).count
>> res0: Long = 10
>>
>> By the way not sure exactly why you need the udf registration here?
>>
>>
>> On Tue, 23 Aug 2016 at 20:12 Mich Talebzadeh 
>> wrote:
>>
>>> Hi gents,
>>>
>>> Well I was trying to see whether I can create an array of elements. From
>>> RDD to DF, register as TempTable and store it  as a Hive table
>>>
>>> import scala.util.Random
>>> //
>>> // UDF to create a random string of charlength characters
>>> //
>>> def random_string(chars: String, charlength: Int) : String = {
>>>   val newKey = (1 to charlength).map(
>>> x =>
>>> {
>>>   val index = Random.nextInt(chars.length)
>>>   chars(index)
>>> }
>>>).mkString("")
>>>return newKey
>>> }
>>> spark.udf.register("random_string", random_string(_:String, _:Int))
>>> case class columns (col1: Int, col2: String)
>>> val chars = ('a' to 'z') ++ ('A' to 'Z')
>>> var text = ""
>>> val comma = ","
>>> val terminator = "))"
>>> var random_char = ""
>>> for (i  <- 1 to 10) {
>>> random_char = random_string(chars.mkString(""), 10)
>>> if (i < 10) {text = text + """(""" + i.toString +
>>> """,+random_char+)"""+comma}
>>>else {text = text + """(""" + i.toString +
>>> """,+random_char+)"""}
>>> }
>>> println(text)
>>> val df = sc.parallelize((Array(text)))
>>>
>>>
>>> Unfortunately that only sees it as the text and interprets it as text.
>>>
>>> I can write is 

spark-jdbc impala with kerberos using yarn-client

2016-08-23 Thread twisterius
I am trying to use the spark-jdbc package to access an impala table via a
spark data frame. From my understanding
(https://issues.apache.org/jira/browse/SPARK-12312) When loading DataFrames
from JDBC datasource with Kerberos authentication, remote executors
(yarn-client/cluster etc. modes) fail to establish a connection due to lack
of Kerberos ticket or ability to generate it. I found a solution to this
issue by creating an jdbc driver which properly handles kerberos
authenticatation
(https://datamountaineer.com/2016/01/15/spark-jdbc-sql-server-kerberos/).
However I cannot find the source the impala jdbc driver online. Should I
just use a hive driver to enable kerberos authentication for impala, or is
there a location where I can find the impala jdbc driver source. Also is the
ticket listed above SPARK-12312 accurate, or is there an out of the box way
for me to connect to a kerberized impala using
sqlContext.load("jdbc",options) without having to rewrite the impala driver?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-jdbc-impala-with-kerberos-using-yarn-client-tp27589.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.0 with Kafka 0.10 exception

2016-08-23 Thread Srikanth
Hello,

I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.

16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-example mt_event 0 15782114
> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
> spark-executor-example.
> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 6)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-example mt_event 0 15782114 after polling for 512
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>

I get this error intermittently. Sometimes a few batches are scheduled and
run fine. Then I get this error.
kafkacat is able to fetch from this topic continuously.

Full exception is here --
https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767

Srikanth


Re: Are RDD's ever persisted to disk?

2016-08-23 Thread Sean Owen
We're probably mixing up some semantics here. An RDD is indeed,
really, just some bookkeeping that records how a certain result is
computed. It is not the data itself.

However we often talk about "persisting an RDD" which means
"persisting the result of computing the RDD" in which case that
persisted representation can be used instead of recomputing it.

The result of computing an RDD is really some objects in memory. It's
possible to persist the RDD in memory by just storing these objects in
memory as cached partitions. This involves no serialization.

Data can be persisted to disk but this involves serializing objects to
bytes (not byte code). It's also possible to store a serialized
representation in memory because it may be more compact.

This is not the same as saving/writing an RDD to persistent storage as
text or JSON or whatever.

On Tue, Aug 23, 2016 at 9:28 PM, kant kodali  wrote:
> @srkanth are you sure? the whole point of RDD's is to store transformations
> but not the data as the spark paper points out but I do lack the practical
> experience for me to confirm. when I looked at the spark source
> code(specifically the checkpoint code) a while ago it was clearly storing
> some JVM byte code to disk which I thought were the transformations.
>
>
>
> On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:
>>
>> RDD contains data but not JVM byte code i.e. data which is read from
>> source and transformations have been applied. This is ideal case to persist
>> RDDs.. As Nirav mentioned this data will be serialized before persisting to
>> disk..
>>
>>
>>
>> Thanks,
>> Sreekanth Jella
>>
>>
>>
>> From: kant kodali
>> Sent: Tuesday, August 23, 2016 3:59 PM
>> To: Nirav
>> Cc: RK Aduri; srikanth.je...@gmail.com; user@spark.apache.org
>> Subject: Re: Are RDD's ever persisted to disk?
>>
>>
>>
>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case
>> of Java or Scala). am I correct?
>>
>>
>>
>>
>>
>> On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:
>>
>> You can store either in serialized form(butter array) or just save it in a
>> string format like tsv or csv. There are different RDD save apis for that.
>>
>> Sent from my iPhone
>>
>>
>> On Aug 23, 2016, at 12:26 PM, kant kodali  wrote:
>>
>> ok now that I understand RDD can be stored to the disk. My last question
>> on this topic would be this.
>>
>>
>>
>> Storing RDD to disk is nothing but storing JVM byte code to disk (in case
>> of Java or Scala). am I correct?
>>
>>
>>
>>
>>
>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:
>>
>> On an other note, if you have a streaming app, you checkpoint the RDDs so
>> that they can be accessed in case of a failure. And yes, RDDs are persisted
>> to DISK. You can access spark’s UI and see it listed under Storage tab.
>>
>>
>>
>> If RDDs are persisted in memory, you avoid any disk I/Os so that any
>> lookups will be cheap. RDDs are reconstructed based on a graph (DAG -
>> available in Spark UI )
>>
>>
>>
>> On Aug 23, 2016, at 12:10 PM, 
>>  wrote:
>>
>>
>>
>> RAM or Virtual memory is finite, so data size needs to be considered
>> before persist. Please see below documentation when to choose the
>> persistency level.
>>
>>
>>
>>
>> http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>>
>>
>>
>> Thanks,
>> Sreekanth Jella
>>
>>
>>
>> From: kant kodali
>> Sent: Tuesday, August 23, 2016 2:42 PM
>> To: srikanth.je...@gmail.com
>> Cc: user@spark.apache.org
>> Subject: Re: Are RDD's ever persisted to disk?
>>
>>
>>
>> so when do we ever need to persist RDD on disk? given that we don't need
>> to worry about RAM(memory) as virtual memory will just push pages to the
>> disk when memory becomes scarce.
>>
>>
>>
>>
>>
>> On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:
>>
>> Hi Kant Kodali,
>>
>>
>>
>> Based on the input parameter to persist() method either it will be cached
>> on memory or persisted to disk. In case of failures Spark will reconstruct
>> the RDD on a different executor based on the DAG. That is how failures are
>> handled. Spark Core does not replicate the RDDs as they can be reconstructed
>> from the source (let’s say HDFS, Hive or S3 etc.) but not from memory (which
>> is lost already).
>>
>>
>>
>> Thanks,
>> Sreekanth Jella
>>
>>
>>
>> From: kant kodali
>> Sent: Tuesday, August 23, 2016 2:12 PM
>> To: user@spark.apache.org
>> Subject: Are RDD's ever persisted to disk?
>>
>>
>>
>> I am new to spark and I keep hearing that RDD's can be persisted to memory
>> or disk after each checkpoint. I wonder why RDD's are persisted in memory?
>> In case of node failure how would you access memory to reconstruct the RDD?
>> persisting to disk make sense because its like persisting to a Network file
>> system (in case of HDFS) where a each block will have multiple copies across
>> nodes so if a node goes down 

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
Also if I were believe that it stores data then why do RDD needs to be
recomputed in the case of node failure? since the data has already been saved to
disk(according to you) after applying the transformation. It can simply just
bring back those data blocks right there is really no need to reconstruct an RDD
from its lineage in that case. so this sounds very contradictory to me after
reading the spark paper.





On Tue, Aug 23, 2016 1:28 PM, kant kodali kanth...@gmail.com wrote:
@srkanth are you sure? the whole point of RDD's is to store transformations but
not the data as the spark paper points out but I do lack the practical
experience for me to confirm. when I looked at the spark source code 
(specifically the checkpoint code) a while ago it was clearly storing some JVM
byte code to disk which I thought were the transformations.





On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:
RDD contains data but not JVM byte code i.e. data which is read from source and
transformations have been applied. This is ideal case to persist RDDs.. As Nirav
mentioned this data will be serialized before persisting to disk..



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 3:59 PM
To: Nirav
Cc: RK Aduri ; srikanth.je...@gmail.com ; user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk?



Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?







On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

You can store either in serialized form(butter array) or just save it in a
string format like tsv or csv. There are different RDD save apis for that.

Sent from my iPhone


On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

ok now that I understand RDD can be stored to the disk. My last question on this
topic would be this.



Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?







On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

On an other note, if you have a streaming app, you checkpoint the RDDs so that
they can be accessed in case of a failure. And yes, RDDs are persisted to DISK.
You can access spark’s UI and see it listed under Storage tab.



If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups
will be cheap. RDDs are reconstructed based on a graph (DAG - available in Spark
UI )



On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com > < 
srikanth.je...@gmail.com > wrote:



RAM or Virtual memory is finite, so data size needs to be considered before
persist. Please see below documentation when to choose the persistency level.




http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:42 PM
To: srikanth.je...@gmail.com
Cc: user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk?



so when do we ever need to persist RDD on disk? given that we don't need to
worry about RAM(memory) as virtual memory will just push pages to the disk when
memory becomes scarce.







On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:

Hi Kant Kodali,



Based on the input parameter to persist() method either it will be cached on
memory or persisted to disk. In case of failures Spark will reconstruct the RDD
on a different executor based on the DAG. That is how failures are handled.
Spark Core does not replicate the RDDs as they can be reconstructed from the
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost
already).



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:12 PM
To: user@spark.apache.org
Subject: Are RDD's ever persisted to disk?



I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?





Collective[i] dramatically improves sales and marketing performance using
technology, applications and a revolutionary network designed to provide next
generation analytics and decision-support directly to business users. Our goal
is to maximize human potential and minimize mistakes. In most cases, the results
are astounding. We cannot, however, stop emails from sometimes being sent to the
wrong person. If you are not the intended recipient, please notify us by
replying to this email's sender and deleting it (and any attachments)
permanently from your system. If you are, please respect the 

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
@srkanth are you sure? the whole point of RDD's is to store transformations but
not the data as the spark paper points out but I do lack the practical
experience for me to confirm. when I looked at the spark source code 
(specifically the checkpoint code) a while ago it was clearly storing some JVM
byte code to disk which I thought were the transformations.





On Tue, Aug 23, 2016 1:11 PM, srikanth.je...@gmail.com wrote:
RDD contains data but not JVM byte code i.e. data which is read from source and
transformations have been applied. This is ideal case to persist RDDs.. As Nirav
mentioned this data will be serialized before persisting to disk..



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 3:59 PM
To: Nirav
Cc: RK Aduri ; srikanth.je...@gmail.com ; user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk?



Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?







On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:

You can store either in serialized form(butter array) or just save it in a
string format like tsv or csv. There are different RDD save apis for that.

Sent from my iPhone


On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

ok now that I understand RDD can be stored to the disk. My last question on this
topic would be this.



Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?







On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:

On an other note, if you have a streaming app, you checkpoint the RDDs so that
they can be accessed in case of a failure. And yes, RDDs are persisted to DISK.
You can access spark’s UI and see it listed under Storage tab.



If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups
will be cheap. RDDs are reconstructed based on a graph (DAG - available in Spark
UI )



On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com > < 
srikanth.je...@gmail.com > wrote:



RAM or Virtual memory is finite, so data size needs to be considered before
persist. Please see below documentation when to choose the persistency level.




http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:42 PM
To: srikanth.je...@gmail.com
Cc: user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk?



so when do we ever need to persist RDD on disk? given that we don't need to
worry about RAM(memory) as virtual memory will just push pages to the disk when
memory becomes scarce.







On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:

Hi Kant Kodali,



Based on the input parameter to persist() method either it will be cached on
memory or persisted to disk. In case of failures Spark will reconstruct the RDD
on a different executor based on the DAG. That is how failures are handled.
Spark Core does not replicate the RDDs as they can be reconstructed from the
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost
already).



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:12 PM
To: user@spark.apache.org
Subject: Are RDD's ever persisted to disk?



I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?





Collective[i] dramatically improves sales and marketing performance using
technology, applications and a revolutionary network designed to provide next
generation analytics and decision-support directly to business users. Our goal
is to maximize human potential and minimize mistakes. In most cases, the results
are astounding. We cannot, however, stop emails from sometimes being sent to the
wrong person. If you are not the intended recipient, please notify us by
replying to this email's sender and deleting it (and any attachments)
permanently from your system. If you are, please respect the confidentiality of
this communication's contents.

Re: Breaking down text String into Array elements

2016-08-23 Thread RK Aduri
That’s because of this:

scala> val text = 
Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"))
text: Array[(Int, String)] = Array((1,hNjLJEgjxn), (2,lgryHkVlCN), 
(3,ukswqcanVC), (4,ZFULVxzAsv), (5,LNzOozHZPF), (6,KZPYXTqMkY), (7,DVjpOvVJTw), 
(8,LKRYrrLrLh), (9,acheneIPDM), (10,iGZTrKfXNr))

scala> Array(text).getClass()
res1: Class[_ <: Array[Array[(Int, String)]]] = class [[Lscala.Tuple2;

scala> Array(text).length
res2: Int = 1

You see that Array(text) is basically a single element.


> On Aug 23, 2016, at 12:26 PM, Nick Pentreath  wrote:
> 
> 
> How about something like 
> 
> scala> val text = (1 to 10).map(i => (i.toString, 
> random_string(chars.mkString(""), 10))).toArray
> 
> text: Array[(String, String)] = Array((1,FBECDoOoAC), (2,wvAyZsMZnt), 
> (3,KgnwObOFEG), (4,tAZPRodrgP), (5,uSgrqyZGuc), (6,ztrTmbkOhO), 
> (7,qUbQsKtZWq), (8,JDokbiFzWy), (9,vNHgiHSuUM), (10,CmnFjlHnHx))
> 
> scala> sc.parallelize(text).count
> res0: Long = 10
> 
> By the way not sure exactly why you need the udf registration here?
> 
> 
> On Tue, 23 Aug 2016 at 20:12 Mich Talebzadeh  > wrote:
> Hi gents,
> 
> Well I was trying to see whether I can create an array of elements. From RDD 
> to DF, register as TempTable and store it  as a Hive table
> 
> import scala.util.Random
> //
> // UDF to create a random string of charlength characters
> //
> def random_string(chars: String, charlength: Int) : String = {
>   val newKey = (1 to charlength).map(
> x =>
> {
>   val index = Random.nextInt(chars.length)
>   chars(index)
> }
>).mkString("")
>return newKey
> }
> spark.udf.register("random_string", random_string(_:String, _:Int))
> case class columns (col1: Int, col2: String)
> val chars = ('a' to 'z') ++ ('A' to 'Z')
> var text = ""
> val comma = ","
> val terminator = "))"
> var random_char = ""
> for (i  <- 1 to 10) {
> random_char = random_string(chars.mkString(""), 10)
> if (i < 10) {text = text + """(""" + i.toString + 
> """,+random_char+)"""+comma}
>else {text = text + """(""" + i.toString + """,+random_char+)"""}
> }
> println(text)
> val df = sc.parallelize((Array(text)))
> 
> 
> Unfortunately that only sees it as the text and interprets it as text.
> 
> I can write is easily as a shell script with ${text} passed to Array and it 
> will work. I was wondering if I could do this in Spark/Scala with my limited 
> knowledge
> 
> Cheers
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 23 August 2016 at 19:00, Nick Pentreath  > wrote:
> what is "text"? i.e. what is the "val text = ..." definition?
> 
> If text is a String itself then indeed sc.parallelize(Array(text)) is doing 
> the correct thing in this case.
> 
> 
> On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh  > wrote:
> I am sure someone know this :)
> 
> Created a dynamic text string which has format
> 
> scala> println(text)
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
> 
> now if I do
> 
> scala> val df = 
> sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
> df: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[230] at 
> parallelize at :39
> scala> df.count
> res157: Long = 10
> It shows ten Array elements, which is correct.
> 
> Now if I pass that text into Array it only sees one row
> 
> scala> val df = sc.parallelize((Array(text)))
> df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at 
> parallelize at :41
> scala> df.count
> res158: Long = 1
> 
> Basically it sees it as one element of array
> 
> scala> df.first
> res165: String = 
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
> Which is not what I want.
> 
> Any ideas?
> 
> Thanks
> 
> 
> 
> 
> 
> 
> 
> 

RE: Are RDD's ever persisted to disk?

2016-08-23 Thread srikanth.jella
RDD contains data but not JVM byte code i.e. data which is read from source and 
transformations have been applied. This is ideal case to persist RDDs.. As 
Nirav mentioned this data will be serialized before persisting to disk..

Thanks,
Sreekanth Jella

From: kant kodali

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?





On Tue, Aug 23, 2016 12:55 PM, Nirav nira...@gmail.com wrote:
You can store either in serialized form(butter array) or just save it in a
string format like tsv or csv. There are different RDD save apis for that.

Sent from my iPhone
On Aug 23, 2016, at 12:26 PM, kant kodali < kanth...@gmail.com > wrote:

ok now that I understand RDD can be stored to the disk. My last question on this
topic would be this.
Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?





On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:
On an other note, if you have a streaming app, you checkpoint the RDDs so that
they can be accessed in case of a failure. And yes, RDDs are persisted to DISK.
You can access spark’s UI and see it listed under Storage tab.
If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups
will be cheap. RDDs are reconstructed based on a graph (DAG - available in Spark
UI )
On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com > < 
srikanth.je...@gmail.com > wrote:
RAM or Virtual memory is finite, so data size needs to be considered before
persist. Please see below documentation when to choose the persistency level. 
http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
 Thanks,
Sreekanth Jella From: kant kodali
Sent: Tuesday, August 23, 2016 2:42 PM
To: srikanth.je...@gmail.com
Cc: user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk? so when do we ever need to 
persist RDD on disk? given that we don't need to
worry about RAM(memory) as virtual memory will just push pages to the disk when
memory becomes scarce.



On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:Hi Kant Kodali,



Based on the input parameter to persist() method either it will be cached on
memory or persisted to disk. In case of failures Spark will reconstruct the RDD
on a different executor based on the DAG. That is how failures are handled.
Spark Core does not replicate the RDDs as they can be reconstructed from the
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost
already).



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:12 PM
To: user@spark.apache.org
Subject: Are RDD's ever persisted to disk?



I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?





Collective[i] dramatically improves sales and marketing performance using
technology, applications and a revolutionary network designed to provide next
generation analytics and decision-support directly to business users. Our goal
is to maximize human potential and minimize mistakes. In most cases, the results
are astounding. We cannot, however, stop emails from sometimes being sent to the
wrong person. If you are not the intended recipient, please notify us by
replying to this email's sender and deleting it (and any attachments)
permanently from your system. If you are, please respect the confidentiality of
this communication's contents.

Re: Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-23 Thread Arun Luthra
Splitting up the Maps to separate objects did not help.

However, I was able to work around the problem by reimplementing it with
RDD joins.

On Aug 18, 2016 5:16 PM, "Arun Luthra"  wrote:

> This might be caused by a few large Map objects that Spark is trying to
> serialize. These are not broadcast variables or anything, they're just
> regular objects.
>
> Would it help if I further indexed these maps into a two-level Map i.e.
> Map[String, Map[String, Int]] ? Or would this still count against me?
>
> What if I manually split them up into numerous Map variables?
>
> On Mon, Aug 15, 2016 at 2:12 PM, Arun Luthra 
> wrote:
>
>> I got this OOM error in Spark local mode. The error seems to have been at
>> the start of a stage (all of the stages on the UI showed as complete, there
>> were more stages to do but had not showed up on the UI yet).
>>
>> There appears to be ~100G of free memory at the time of the error.
>>
>> Spark 2.0.0
>> 200G driver memory
>> local[30]
>> 8 /mntX/tmp directories for spark.local.dir
>> "spark.sql.shuffle.partitions", "500"
>> "spark.driver.maxResultSize","500"
>> "spark.default.parallelism", "1000"
>>
>> The line number for the error is at an RDD map operation where there are
>> some potentially large Map objects that are going to be accessed by each
>> record. Does it matter if they are broadcast variables or not? I imagine
>> not because its in local mode they should be available in memory to every
>> executor/core.
>>
>> Possibly related:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cl
>> osureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html
>>
>> Exception in thread "main" java.lang.OutOfMemoryError
>> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputSt
>> ream.java:123)
>> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutput
>> Stream.java:93)
>> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Objec
>> tOutputStream.java:1877)
>> at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDat
>> aMode(ObjectOutputStream.java:1786)
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at org.apache.spark.serializer.JavaSerializationStream.writeObj
>> ect(JavaSerializer.scala:43)
>> at org.apache.spark.serializer.JavaSerializerInstance.serialize
>> (JavaSerializer.scala:100)
>> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(Clo
>> sureCleaner.scala:295)
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:288)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
>> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:151)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>> at org.apache.spark.rdd.RDD.map(RDD.scala:365)
>> at abc.Abc$.main(abc.scala:395)
>> at abc.Abc.main(abc.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>> $SparkSubmit$$runMain(SparkSubmit.scala:729)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>> .scala:185)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>


Re: Are RDD's ever persisted to disk?

2016-08-23 Thread Nirav
You can store either in serialized form(butter array) or just save it in a 
string format like tsv or csv. There are different RDD save apis for that.

Sent from my iPhone

> On Aug 23, 2016, at 12:26 PM, kant kodali  wrote:
> 
> 
> ok now that I understand RDD can be stored to the disk. My last question on 
> this topic would be this.
> 
> Storing RDD to disk is nothing but storing JVM byte code to disk (in case of 
> Java or Scala). am I correct?
> 
> 
> 
> 
> 
>> On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:
>> On an other note, if you have a streaming app, you checkpoint the RDDs so 
>> that they can be accessed in case of a failure. And yes, RDDs are persisted 
>> to DISK. You can access spark’s UI and see it listed under Storage tab. 
>> 
>> If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups 
>> will be cheap. RDDs are reconstructed based on a graph (DAG - available in 
>> Spark UI )
>> 
>>> On Aug 23, 2016, at 12:10 PM,  
>>>  wrote:
>>> 
>>> RAM or Virtual memory is finite, so data size needs to be considered before 
>>> persist. Please see below documentation when to choose the persistency 
>>> level.
>>>  
>>> http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>>>  
>>> Thanks,
>>> Sreekanth Jella
>>>  
>>> From: kant kodali
>>> Sent: Tuesday, August 23, 2016 2:42 PM
>>> To: srikanth.je...@gmail.com
>>> Cc: user@spark.apache.org
>>> Subject: Re: Are RDD's ever persisted to disk?
>>>  
>>> so when do we ever need to persist RDD on disk? given that we don't need to 
>>> worry about RAM(memory) as virtual memory will just push pages to the disk 
>>> when memory becomes scarce.
>>> 
>>>  
>>> 
>>>  
>>> 
>>> On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:
>>> Hi Kant Kodali,
>>> 
>>>  
>>> 
>>> Based on the input parameter to persist() method either it will be cached 
>>> on memory or persisted to disk. In case of failures Spark will reconstruct 
>>> the RDD on a different executor based on the DAG. That is how failures are 
>>> handled. Spark Core does not replicate the RDDs as they can be 
>>> reconstructed from the source (let’s say HDFS, Hive or S3 etc.) but not 
>>> from memory (which is lost already).
>>> 
>>>  
>>> 
>>> Thanks,
>>> Sreekanth Jella
>>> 
>>>  
>>> 
>>> From: kant kodali
>>> Sent: Tuesday, August 23, 2016 2:12 PM
>>> To: user@spark.apache.org
>>> Subject: Are RDD's ever persisted to disk?
>>> 
>>>  
>>> 
>>> I am new to spark and I keep hearing that RDD's can be persisted to memory 
>>> or disk after each checkpoint. I wonder why RDD's are persisted in memory? 
>>> In case of node failure how would you access memory to reconstruct the RDD? 
>>> persisting to disk make sense because its like persisting to a Network file 
>>> system (in case of HDFS) where a each block will have multiple copies 
>>> across nodes so if a node goes down RDD's can still be reconstructed by the 
>>> reading the required block from other nodes and recomputing it but my 
>>> biggest question is Are RDD's ever persisted to disk? 
>>> 
>>> 
>>> 
>> 
>> 
>> Collective[i] dramatically improves sales and marketing performance using 
>> technology, applications and a revolutionary network designed to provide 
>> next generation analytics and decision-support directly to business users. 
>> Our goal is to maximize human potential and minimize mistakes. In most 
>> cases, the results are astounding. We cannot, however, stop emails from 
>> sometimes being sent to the wrong person. If you are not the intended 
>> recipient, please notify us by replying to this email's sender and deleting 
>> it (and any attachments) permanently from your system. If you are, please 
>> respect the confidentiality of this communication's contents.
> 
> 
> 
> 
> 
> 


Re: Are RDD's ever persisted to disk?

2016-08-23 Thread RK Aduri
On an other note, if you have a streaming app, you checkpoint the RDDs so that 
they can be accessed in case of a failure. And yes, RDDs are persisted to DISK. 
You can access spark’s UI and see it listed under Storage tab. 

If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups 
will be cheap. RDDs are reconstructed based on a graph (DAG - available in 
Spark UI )

> On Aug 23, 2016, at 12:10 PM,  
>  wrote:
> 
> RAM or Virtual memory is finite, so data size needs to be considered before 
> persist. Please see below documentation when to choose the persistency level.
>  
> http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
>  
> 
>  
> Thanks,
> Sreekanth Jella
>  
> From: kant kodali 
> Sent: Tuesday, August 23, 2016 2:42 PM
> To: srikanth.je...@gmail.com 
> Cc: user@spark.apache.org 
> Subject: Re: Are RDD's ever persisted to disk?
>  
> so when do we ever need to persist RDD on disk? given that we don't need to 
> worry about RAM(memory) as virtual memory will just push pages to the disk 
> when memory becomes scarce.
> 
>  
> 
>  
> 
> On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com 
>  wrote:
> Hi Kant Kodali,
> 
>  
> 
> Based on the input parameter to persist() method either it will be cached on 
> memory or persisted to disk. In case of failures Spark will reconstruct the 
> RDD on a different executor based on the DAG. That is how failures are 
> handled. Spark Core does not replicate the RDDs as they can be reconstructed 
> from the source (let’s say HDFS, Hive or S3 etc.) but not from memory (which 
> is lost already).
> 
>  
> 
> Thanks,
> Sreekanth Jella
> 
>  
> 
> From: kant kodali 
> Sent: Tuesday, August 23, 2016 2:12 PM
> To: user@spark.apache.org 
> Subject: Are RDD's ever persisted to disk?
> 
>  
> 
> I am new to spark and I keep hearing that RDD's can be persisted to memory or 
> disk after each checkpoint. I wonder why RDD's are persisted in memory? In 
> case of node failure how would you access memory to reconstruct the RDD? 
> persisting to disk make sense because its like persisting to a Network file 
> system (in case of HDFS) where a each block will have multiple copies across 
> nodes so if a node goes down RDD's can still be reconstructed by the reading 
> the required block from other nodes and recomputing it but my biggest 
> question is Are RDD's ever persisted to disk? 
> 
> 
> 


-- 
Collective[i] dramatically improves sales and marketing performance using 
technology, applications and a revolutionary network designed to provide 
next generation analytics and decision-support directly to business users. 
Our goal is to maximize human potential and minimize mistakes. In most 
cases, the results are astounding. We cannot, however, stop emails from 
sometimes being sent to the wrong person. If you are not the intended 
recipient, please notify us by replying to this email's sender and deleting 
it (and any attachments) permanently from your system. If you are, please 
respect the confidentiality of this communication's contents.


Re: Breaking down text String into Array elements

2016-08-23 Thread Nick Pentreath
> How about something like
>
> scala> val text = (1 to 10).map(i => (i.toString,
> random_string(chars.mkString(""), 10))).toArray
>
> text: Array[(String, String)] = Array((1,FBECDoOoAC), (2,wvAyZsMZnt),
> (3,KgnwObOFEG), (4,tAZPRodrgP), (5,uSgrqyZGuc), (6,ztrTmbkOhO),
> (7,qUbQsKtZWq), (8,JDokbiFzWy), (9,vNHgiHSuUM), (10,CmnFjlHnHx))
>
> scala> sc.parallelize(text).count
> res0: Long = 10
>
> By the way not sure exactly why you need the udf registration here?
>
>
> On Tue, 23 Aug 2016 at 20:12 Mich Talebzadeh 
> wrote:
>
>> Hi gents,
>>
>> Well I was trying to see whether I can create an array of elements. From
>> RDD to DF, register as TempTable and store it  as a Hive table
>>
>> import scala.util.Random
>> //
>> // UDF to create a random string of charlength characters
>> //
>> def random_string(chars: String, charlength: Int) : String = {
>>   val newKey = (1 to charlength).map(
>> x =>
>> {
>>   val index = Random.nextInt(chars.length)
>>   chars(index)
>> }
>>).mkString("")
>>return newKey
>> }
>> spark.udf.register("random_string", random_string(_:String, _:Int))
>> case class columns (col1: Int, col2: String)
>> val chars = ('a' to 'z') ++ ('A' to 'Z')
>> var text = ""
>> val comma = ","
>> val terminator = "))"
>> var random_char = ""
>> for (i  <- 1 to 10) {
>> random_char = random_string(chars.mkString(""), 10)
>> if (i < 10) {text = text + """(""" + i.toString +
>> """,+random_char+)"""+comma}
>>else {text = text + """(""" + i.toString +
>> """,+random_char+)"""}
>> }
>> println(text)
>> val df = sc.parallelize((Array(text)))
>>
>>
>> Unfortunately that only sees it as the text and interprets it as text.
>>
>> I can write is easily as a shell script with ${text} passed to Array and
>> it will work. I was wondering if I could do this in Spark/Scala with my
>> limited knowledge
>>
>> Cheers
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 23 August 2016 at 19:00, Nick Pentreath 
>> wrote:
>>
>>> what is "text"? i.e. what is the "val text = ..." definition?
>>>
>>> If text is a String itself then indeed sc.parallelize(Array(text)) is
>>> doing the correct thing in this case.
>>>
>>>
>>> On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh 
>>> wrote:
>>>
 I am sure someone know this :)

 Created a dynamic text string which has format

 scala> println(text)

 (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")

 now if I do

 scala> val df =
 sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
 df: org.apache.spark.rdd.RDD[(Int, String)] =
 ParallelCollectionRDD[230] at parallelize at :39
 scala> df.count
 res157: Long = 10
 It shows ten Array elements, which is correct.

 Now if I pass that text into Array it only sees one row

 scala> val df = sc.parallelize((Array(text)))
 df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
 parallelize at :41
 scala> df.count
 res158: Long = 1

 Basically it sees it as one element of array

 scala> df.first
 res165: String =
 (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
 Which is not what I want.

 Any ideas?

 Thanks






 This works fine

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.


Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
ok now that I understand RDD can be stored to the disk. My last question on this
topic would be this.
Storing RDD to disk is nothing but storing JVM byte code to disk (in case of
Java or Scala). am I correct?





On Tue, Aug 23, 2016 12:19 PM, RK Aduri rkad...@collectivei.com wrote:
On an other note, if you have a streaming app, you checkpoint the RDDs so that
they can be accessed in case of a failure. And yes, RDDs are persisted to DISK.
You can access spark’s UI and see it listed under Storage tab.
If RDDs are persisted in memory, you avoid any disk I/Os so that any lookups
will be cheap. RDDs are reconstructed based on a graph (DAG - available in Spark
UI )
On Aug 23, 2016, at 12:10 PM, < srikanth.je...@gmail.com > < 
srikanth.je...@gmail.com > wrote:
RAM or Virtual memory is finite, so data size needs to be considered before
persist. Please see below documentation when to choose the persistency level. 
http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
 Thanks,
Sreekanth Jella From: kant kodali
Sent: Tuesday, August 23, 2016 2:42 PM
To: srikanth.je...@gmail.com
Cc: user@spark.apache.org
Subject: Re: Are RDD's ever persisted to disk? so when do we ever need to 
persist RDD on disk? given that we don't need to
worry about RAM(memory) as virtual memory will just push pages to the disk when
memory becomes scarce.



On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:Hi Kant Kodali,



Based on the input parameter to persist() method either it will be cached on
memory or persisted to disk. In case of failures Spark will reconstruct the RDD
on a different executor based on the DAG. That is how failures are handled.
Spark Core does not replicate the RDDs as they can be reconstructed from the
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost
already).



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:12 PM
To: user@spark.apache.org
Subject: Are RDD's ever persisted to disk?



I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?





Collective[i] dramatically improves sales and marketing performance using
technology, applications and a revolutionary network designed to provide next
generation analytics and decision-support directly to business users. Our goal
is to maximize human potential and minimize mistakes. In most cases, the results
are astounding. We cannot, however, stop emails from sometimes being sent to the
wrong person. If you are not the intended recipient, please notify us by
replying to this email's sender and deleting it (and any attachments)
permanently from your system. If you are, please respect the confidentiality of
this communication's contents.

Maelstrom: Kafka integration with Spark

2016-08-23 Thread Jeoffrey Lim
Hi,

I have released the first version of a new Kafka integration with Spark
that we use in the company I work for: open sourced and named Maelstrom.

It is unique compared to other solutions out there as it reuses the
Kafka Consumer connection to achieve sub-milliseconds latency.

This library has been running stable in production environment and has
been proven to be resilient to numerous production issues.


Please check out the project's page in github:

https://github.com/jeoffreylim/maelstrom


Contributors welcome!


Cheers!

Jeoffrey Lim


P.S. I am also looking for a job opportunity, please look me up at Linked In


RE: Are RDD's ever persisted to disk?

2016-08-23 Thread srikanth.jella
RAM or Virtual memory is finite, so data size needs to be considered before 
persist. Please see below documentation when to choose the persistency level.

http://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

Thanks,
Sreekanth Jella

From: kant kodali

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali
so when do we ever need to persist RDD on disk? given that we don't need to
worry about RAM(memory) as virtual memory will just push pages to the disk when
memory becomes scarce.





On Tue, Aug 23, 2016 11:23 AM, srikanth.je...@gmail.com wrote:
Hi Kant Kodali,



Based on the input parameter to persist() method either it will be cached on
memory or persisted to disk. In case of failures Spark will reconstruct the RDD
on a different executor based on the DAG. That is how failures are handled.
Spark Core does not replicate the RDDs as they can be reconstructed from the
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost
already).



Thanks,
Sreekanth Jella



From: kant kodali
Sent: Tuesday, August 23, 2016 2:12 PM
To: user@spark.apache.org
Subject: Are RDD's ever persisted to disk?



I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?

RE: Are RDD's ever persisted to disk?

2016-08-23 Thread srikanth.jella
Hi Kant Kodali,

Based on the input parameter to persist() method either it will be cached on 
memory or persisted to disk. In case of failures Spark will reconstruct the RDD 
on a different executor based on the DAG. That is how failures are handled. 
Spark Core does not replicate the RDDs as they can be reconstructed from the 
source (let’s say HDFS, Hive or S3 etc.) but not from memory (which is lost 
already).

Thanks,
Sreekanth Jella

From: kant kodali

Re: Zero Data Loss in Spark with Kafka

2016-08-23 Thread Cody Koeninger
See
https://github.com/koeninger/kafka-exactly-once
On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" 
wrote:

> Hi Experts,
>
> I am looking for some information on how to acheive zero data loss while
> working with kafka and Spark. I have searched online and blogs have
> different answer. Please let me know if anyone has idea on this.
>
> Blog 1:
> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>
>
> Blog2:
> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
>
>
> Blog one simply says configuration change with checkpoint directory and
> blog 2 give details about on how to save offsets to zoo keeper. can you
> please help me out with right approach.
>
> Thanks,
> Asmath
>
>
>


Re: Breaking down text String into Array elements

2016-08-23 Thread Mich Talebzadeh
Hi gents,

Well I was trying to see whether I can create an array of elements. From
RDD to DF, register as TempTable and store it  as a Hive table

import scala.util.Random
//
// UDF to create a random string of charlength characters
//
def random_string(chars: String, charlength: Int) : String = {
  val newKey = (1 to charlength).map(
x =>
{
  val index = Random.nextInt(chars.length)
  chars(index)
}
   ).mkString("")
   return newKey
}
spark.udf.register("random_string", random_string(_:String, _:Int))
case class columns (col1: Int, col2: String)
val chars = ('a' to 'z') ++ ('A' to 'Z')
var text = ""
val comma = ","
val terminator = "))"
var random_char = ""
for (i  <- 1 to 10) {
random_char = random_string(chars.mkString(""), 10)
if (i < 10) {text = text + """(""" + i.toString +
""",+random_char+)"""+comma}
   else {text = text + """(""" + i.toString + """,+random_char+)"""}
}
println(text)
val df = sc.parallelize((Array(text)))


Unfortunately that only sees it as the text and interprets it as text.

I can write is easily as a shell script with ${text} passed to Array and it
will work. I was wondering if I could do this in Spark/Scala with my
limited knowledge

Cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 23 August 2016 at 19:00, Nick Pentreath  wrote:

> what is "text"? i.e. what is the "val text = ..." definition?
>
> If text is a String itself then indeed sc.parallelize(Array(text)) is
> doing the correct thing in this case.
>
>
> On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh 
> wrote:
>
>> I am sure someone know this :)
>>
>> Created a dynamic text string which has format
>>
>> scala> println(text)
>> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"
>> LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"
>> LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>>
>> now if I do
>>
>> scala> val df = sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),
>> (3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"
>> DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
>> df: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[230]
>> at parallelize at :39
>> scala> df.count
>> res157: Long = 10
>> It shows ten Array elements, which is correct.
>>
>> Now if I pass that text into Array it only sees one row
>>
>> scala> val df = sc.parallelize((Array(text)))
>> df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
>> parallelize at :41
>> scala> df.count
>> res158: Long = 1
>>
>> Basically it sees it as one element of array
>>
>> scala> df.first
>> res165: String = (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),
>> (4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"
>> LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>> Which is not what I want.
>>
>> Any ideas?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>> This works fine
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Are RDD's ever persisted to disk?

2016-08-23 Thread kant kodali

I am new to spark and I keep hearing that RDD's can be persisted to memory or
disk after each checkpoint. I wonder why RDD's are persisted in memory? In case
of node failure how would you access memory to reconstruct the RDD? persisting
to disk make sense because its like persisting to a Network file system (in case
of HDFS) where a each block will have multiple copies across nodes so if a node
goes down RDD's can still be reconstructed by the reading the required block
from other nodes and recomputing it but my biggest question is Are RDD's ever 
persisted to disk?

Re: Breaking down text String into Array elements

2016-08-23 Thread Nick Pentreath
what is "text"? i.e. what is the "val text = ..." definition?

If text is a String itself then indeed sc.parallelize(Array(text)) is doing
the correct thing in this case.

On Tue, 23 Aug 2016 at 19:42 Mich Talebzadeh 
wrote:

> I am sure someone know this :)
>
> Created a dynamic text string which has format
>
> scala> println(text)
>
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
>
> now if I do
>
> scala> val df =
> sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
> df: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[230]
> at parallelize at :39
> scala> df.count
> res157: Long = 10
> It shows ten Array elements, which is correct.
>
> Now if I pass that text into Array it only sees one row
>
> scala> val df = sc.parallelize((Array(text)))
> df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
> parallelize at :41
> scala> df.count
> res158: Long = 1
>
> Basically it sees it as one element of array
>
> scala> df.first
> res165: String =
> (1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
> Which is not what I want.
>
> Any ideas?
>
> Thanks
>
>
>
>
>
>
> This works fine
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Breaking down text String into Array elements

2016-08-23 Thread Mich Talebzadeh
I am sure someone know this :)

Created a dynamic text string which has format

scala> println(text)
(1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")

now if I do

scala> val df =
sc.parallelize((Array((1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr"
df: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[230] at
parallelize at :39
scala> df.count
res157: Long = 10
It shows ten Array elements, which is correct.

Now if I pass that text into Array it only sees one row

scala> val df = sc.parallelize((Array(text)))
df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[228] at
parallelize at :41
scala> df.count
res158: Long = 1

Basically it sees it as one element of array

scala> df.first
res165: String =
(1,"hNjLJEgjxn"),(2,"lgryHkVlCN"),(3,"ukswqcanVC"),(4,"ZFULVxzAsv"),(5,"LNzOozHZPF"),(6,"KZPYXTqMkY"),(7,"DVjpOvVJTw"),(8,"LKRYrrLrLh"),(9,"acheneIPDM"),(10,"iGZTrKfXNr")
Which is not what I want.

Any ideas?

Thanks






This works fine

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Combining multiple models in Spark-ML 2.0

2016-08-23 Thread janardhan shetty
Any methods to achieve this?
On Aug 22, 2016 3:40 PM, "janardhan shetty"  wrote:

> Hi,
>
> Are there any pointers, links on stacking multiple models in spark
> dataframes ?. WHat strategies can be employed if we need to combine greater
> than 2 models  ?
>


Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Mich Talebzadeh
Russell Is correct here.

micro-batch means it does processing within a window. In general there are
three things here.

batch window

This is the basic interval at which the system with receive the data in
batches. This is the interval set when creating a StreamingContext. For
example, if you set the batch interval as 30 seconds, then any input
DStream will generate RDDs of received data at 30 second intervals.

Within streaming you have what is called "a window operator" which is
defined by two parameters -

- WindowDuration / WindowsLength - the length of the window
- SlideDuration / SlidingInterval - the interval at which the window will
slide or move forward

Example

batch window = 30 secconds
window length = 10 minutes
sliding interval = 5 minutes

In that case, you would be creating an output every 5 minutes, aggregating
data that you were collecting every 30 seconds over a previous 10
minutes period of time

In general depending what you are doing you can tighten above parameters.
For example if you are using Spark Streaming for Anti-fraud detection, you
may stream data in at 2 seconds batch interval, Keep your windows length at
4 seconds and your sliding intervall = 2 seconds which gives you a kind of
tight streaming. You are aggregating data that you are collecting over the
batch Window.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 23 August 2016 at 17:34, Russell Spitzer 
wrote:

> Spark streaming does not process 1 event at a time which is in general I
> think what people call "Streaming." It instead processes groups of events.
> Each group is a "MicroBatch" that gets processed at the same time.
>
> Streaming theoretically always has better latency because the event is
> processed as soon as it arrives. While in microbatching the latency of all
> the events in the batch can be no better than the last element to arrive.
>
> Streaming theoretically has worse performance because events cannot be
> processed in bulk.
>
> In practice throughput and latency are very implementation dependent
>
> On Tue, Aug 23, 2016 at 8:41 AM Aseem Bansal  wrote:
>
>> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/
>> and it mentioned that spark streaming actually mini-batch not actual
>> streaming.
>>
>> I have not used streaming and I am not sure what is the difference in the
>> 2 terms. Hence could not make a judgement myself.
>>
>


Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Russell Spitzer
Spark streaming does not process 1 event at a time which is in general I
think what people call "Streaming." It instead processes groups of events.
Each group is a "MicroBatch" that gets processed at the same time.

Streaming theoretically always has better latency because the event is
processed as soon as it arrives. While in microbatching the latency of all
the events in the batch can be no better than the last element to arrive.

Streaming theoretically has worse performance because events cannot be
processed in bulk.

In practice throughput and latency are very implementation dependent

On Tue, Aug 23, 2016 at 8:41 AM Aseem Bansal  wrote:

> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/
> and it mentioned that spark streaming actually mini-batch not actual
> streaming.
>
> I have not used streaming and I am not sure what is the difference in the
> 2 terms. Hence could not make a judgement myself.
>


Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread pandees waran
It's based on "micro batching" model.

Sent from my iPhone

> On Aug 23, 2016, at 8:41 AM, Aseem Bansal  wrote:
> 
> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/ and 
> it mentioned that spark streaming actually mini-batch not actual streaming. 
> 
> I have not used streaming and I am not sure what is the difference in the 2 
> terms. Hence could not make a judgement myself.


Re: UDTRegistration (in Java)

2016-08-23 Thread raghukiran
Well, it is perplexing - as I am able to simply call UDTRegistration from
Java. And maybe it is not working properly? I was able to put in a
class/String through the register function. And when I call exists(..) it
returns true. So, it appears to work, but has issues :-)

Regards,
Raghu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UDTRegistration-in-Java-tp27579p27587.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Zero Data Loss in Spark with Kafka

2016-08-23 Thread Sudhir Babu Pothineni
saving offsets to zookeeper is old approach, check-pointing internally
saves the offsets to HDFS/location of checkpointing.

more details here:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Tue, Aug 23, 2016 at 10:30 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Experts,
>
> I am looking for some information on how to acheive zero data loss while
> working with kafka and Spark. I have searched online and blogs have
> different answer. Please let me know if anyone has idea on this.
>
> Blog 1:
> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>
>
> Blog2:
> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
>
>
> Blog one simply says configuration change with checkpoint directory and
> blog 2 give details about on how to save offsets to zoo keeper. can you
> please help me out with right approach.
>
> Thanks,
> Asmath
>
>
>


Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Aseem Bansal
I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/
and it mentioned that spark streaming actually mini-batch not actual
streaming.

I have not used streaming and I am not sure what is the difference in the 2
terms. Hence could not make a judgement myself.


Zero Data Loss in Spark with Kafka

2016-08-23 Thread KhajaAsmath Mohammed
Hi Experts,

I am looking for some information on how to acheive zero data loss while
working with kafka and Spark. I have searched online and blogs have
different answer. Please let me know if anyone has idea on this.

Blog 1:
https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html


Blog2:
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html


Blog one simply says configuration change with checkpoint directory and
blog 2 give details about on how to save offsets to zoo keeper. can you
please help me out with right approach.

Thanks,
Asmath


Things to do learn Cassandra in Apache Spark Environment

2016-08-23 Thread Gokula Krishnan D
Hello All -

Hope, you are doing good.

I have a general question. I am working on Hadoop using Apache Spark.

At this moment, we are not using Cassandra but I would like to know what's
the scope of learning and using it in the Hadoop environment.

It would be great if you could provide the use case to understand more.

If so what are things to do in terms of learning.


Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: Can one create a dynamic Array and convert it to DF

2016-08-23 Thread Mich Talebzadeh
Ok I sorted out basic problem.

I can create a text string dynamically with 2 columns and numerate rows

scala> println(text)
(1,"VDNiqDKChu"),(2,"LApMjYGYkC"),(3,"HuVCyfizzD"),(4,"kUSzHWquGA"),(5,"OlJGGQQlUh"),(6,"POljdWgAIN"),(7,"wsRqqGZaqy"),(8,"HOgdjAFUln"),(9,"jYwvafOjDo"),(10,"QlvZGMBimd")

If I cut and paste it on the terminal, this works OK

scala> val df = sc.parallelize(Array(
(1,"VDNiqDKChu"),(2,"LApMjYGYkC"),(3,"HuVCyfizzD"),(4,"kUSzHWquGA"),(5,"OlJGGQQlUh"),(6,"POljdWgAIN"),(7,"wsRqqGZaqy"),(8,"HOgdjAFUln"),(9,"jYwvafOjDo"),(10,"QlvZGMBimd
") ))

scala> df.count
res50: Long = 10

So I see 10 entries in the array

If I do the following passing text String to the Array

scala> val df = sc.parallelize(( Array(text)   ))
df: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[113] at
parallelize at :29

It only interprets as one String!

scala> df.count
res52: Long = 1

Is there anyway I can force it to see it NOT as a String and interpret it?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 23 August 2016 at 12:51, Mich Talebzadeh 
wrote:

> Hi,
>
> I can easily do this in shell but wanted to see what I can do in Spark.
>
> I am trying to create a simple table (10 rows, 2 columns) for now and then
> register it as tempTable and store in Hive, if it is feasible.
>
> First column col1 is monolithically incrementing integer and the second
> column a string of 10 random chars
>
> Use a UDF to create random char on length (charlength)
>
>
> import scala.util.Random
> def random_string(chars: String, charlength: Int) : String = {
>   val newKey = (1 to charlength).map(
> x =>
> {
>   val index = Random.nextInt(chars.length)
>   chars(index)
> }
>).mkString("")
>return newKey
> }
> spark.udf.register("random_string", random_string(_:String, _:Int))
> //create class
> case class columns (col1: Int, col2: String)
>
> val chars = ('a' to 'z') ++ ('A' to 'Z')
> var text = "Array("
> val comma = ","
> val terminator = "))"
> for (i  <- 1 to 10) {
> var random_char = random_string(chars.mkString(""), 10)
> if (i < 10) {text = text + """(""" + i.toString +
> """,+random_char+)"""+comma}
>else {text = text + """(""" + i.toString +
> """,+random_char+)))"""}
> }
> println(text)
> val df =sc.parallelize(text)
>
> val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
> p._2.toString)).toDF
>
> When I run it I get this
>
> Loading dynamic_ARRAY_generator.scala...
> import scala.util.Random
> random_string: (chars: String, charlength: Int)String
> res0: org.apache.spark.sql.expressions.UserDefinedFunction =
> UserDefinedFunction(,StringType,Some(List(StringType,
> IntegerType)))
> defined class columns
> chars: scala.collection.immutable.IndexedSeq[Char] = Vector(a, b, c, d,
> e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z, A, B, C,
> D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z)
> text: String = Array(
> comma: String = ,
> terminator: String = ))
> Array((1,"yyzbPpXEoX"),(2,"bEnzvFCdXm"),(3,"dKXZbgaGTr"),
> (4,"hIHGkiWjcy"),(5,"HBnJmYlefk"),(6,"MKqfwWCmah"),(7,"CrKYmsbXKI"),(8,"
> iySnzSKtuH"),(9,"BbCRKqtkml"),(10,"nYdxrDneUm")))
>
> *df: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[0] at
> parallelize at :27*:29: error: value _1 is not a member
> of Char
>val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
> p._2.toString)).toDF
>^
> :29: error: value _2 is not a member of Char
>val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
> p._2.toString)).toDF
>
> ^
> :29: error: value toDF is not a member of
> org.apache.spark.rdd.RDD[U]
>val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
> p._2.toString)).toDF
>
> ^
>
> This works
>
> val df =sc.parallelize(text)
>
> But this fails
>
> val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
> p._2.toString)).toDF
>
> I gather it sees it at RDD[Char]!
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from 

Retrying: Using spark to distribute jobs to standalone servers

2016-08-23 Thread Larry White
(apologies if this appears twice. I sent it 24 hours ago and it hasn't hit
the list yet)

Hi,

I have a bit of an unusual use-case and would greatly appreciate some
feedback from experienced Sparklers as to whether it is a good fit for
spark.

I have a network of compute/data servers configured as a tree as shown below

   - controller
   - server 1
  - server 2
  - server 3
  - etc.

There are ~20 servers, with the number is increasing to near 100.

Each server contains a different dataset, all in the same format. Each is
hosted by a different organization, and the data on every individual server
is unique to that organization. Essentially, each server hosts a single
partition.

Data *cannot* be replicated across servers using RDDs or any other means,
for privacy/ownership reasons.

Raw data *cannot* be retrieved to the controller, except in summary form.

We would like to send jobs from the controller to be executed in parallel
on all the servers, and retrieve the results to the controller. The jobs
would consist of SQL-Heavy Java code for 'production' queries, and python
code for ad-hoc queries and predictive modeling.

There are no operations that treats the data as if it were a single data
set: We could run a classifier on each site individually, but cannot for
legal reasons, pull all the data into a single *physical* dataframe to run
the classifier on all of it together.

The servers are located across a wide geographic region (1,000s of miles)

Spark seems to have the capability to meet many of the individual
requirements, but is it a reasonable platform overall for building this
application?  In particular, I'm wondering about:

1. Possible issues distributing queries to a set of servers that don't
constitute a typical spark cluster.
2. Support for executing jobs written in Java on the remote servers.

Thank you very much for your assistance.

Larry


question about Broadcast value NullPointerException

2016-08-23 Thread Chong Zhang
Hello,

I'm using Spark streaming to process kafka message, and wants to use a prop
file as the input and broadcast the properties:

val props = new Properties()
props.load(new FileInputStream(args(0)))
val sc = initSparkContext()
val propsBC = sc.broadcast(props)
println(s"propFileBC 1: " + propsBC.value)

val lines = createKStream(sc)
val parsedLines = lines.map (l => {
println(s"propFileBC 2: " + propsBC.value)
process(l, propsBC.value)
}).filter(...)

var goodLines = lines.window(2,2)
goodLines.print()


If I run it with spark-submit and master local[2], it works fine.
But if I used the --master spark://master:7077 (2 nodes), the 1st
propsBC.value is printed, but the 2nd print inside the map function causes
null pointer exception:

Caused by: java.lang.NullPointerException
at test.spark.Main$$anonfun$1.apply(Main.scala:79)
at test.spark.Main$$anonfun$1.apply(Main.scala:78)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

Appreciate any help,  thanks!


RE: spark.lapply in SparkR: Error in writeBin(batch, con, endian = "big")

2016-08-23 Thread Cinquegrana, Piero
The output from score() is very small, just a float. The input, however, could 
be as big as several hundred MBs. I would like to broadcast the dataset to all 
executors.

Thanks,
Piero

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Monday, August 22, 2016 10:48 PM
To: Cinquegrana, Piero ; user@spark.apache.org
Subject: Re: spark.lapply in SparkR: Error in writeBin(batch, con, endian = 
"big")

How big is the output from score()?

Also could you elaborate on what you want to broadcast?



On Mon, Aug 22, 2016 at 11:58 AM -0700, "Cinquegrana, Piero" 
> wrote:
Hello,

I am using the new R API in SparkR spark.lapply (spark 2.0). I am defining a 
complex function to be run across executors and I have to send the entire 
dataset, but there is not (that I could find) a way to broadcast the variable 
in SparkR. I am thus reading the dataset in each executor from disk, but I 
getting the following error:

Error in writeBin(batch, con, endian = "big") :
  attempting to add too many elements to raw vector

Any idea why this is happening?

Pseudo code:

scoreModel <- function(parameters){
   library(read.table)
   dat <- data.frame(fread("file.csv"))
   score(dat,parameters)
}

parameterList <- lapply(1:numModels, function(i) getParameters(i))

modelScores <- spark.lapply(parameterList, scoreModel)


Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook
   [New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  
Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook
   [New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  
Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Can one create a dynamic Array and convert it to DF

2016-08-23 Thread Mich Talebzadeh
Hi,

I can easily do this in shell but wanted to see what I can do in Spark.

I am trying to create a simple table (10 rows, 2 columns) for now and then
register it as tempTable and store in Hive, if it is feasible.

First column col1 is monolithically incrementing integer and the second
column a string of 10 random chars

Use a UDF to create random char on length (charlength)


import scala.util.Random
def random_string(chars: String, charlength: Int) : String = {
  val newKey = (1 to charlength).map(
x =>
{
  val index = Random.nextInt(chars.length)
  chars(index)
}
   ).mkString("")
   return newKey
}
spark.udf.register("random_string", random_string(_:String, _:Int))
//create class
case class columns (col1: Int, col2: String)

val chars = ('a' to 'z') ++ ('A' to 'Z')
var text = "Array("
val comma = ","
val terminator = "))"
for (i  <- 1 to 10) {
var random_char = random_string(chars.mkString(""), 10)
if (i < 10) {text = text + """(""" + i.toString +
""",+random_char+)"""+comma}
   else {text = text + """(""" + i.toString +
""",+random_char+)))"""}
}
println(text)
val df =sc.parallelize(text)

val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF

When I run it I get this

Loading dynamic_ARRAY_generator.scala...
import scala.util.Random
random_string: (chars: String, charlength: Int)String
res0: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(StringType,
IntegerType)))
defined class columns
chars: scala.collection.immutable.IndexedSeq[Char] = Vector(a, b, c, d, e,
f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z, A, B, C, D,
E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z)
text: String = Array(
comma: String = ,
terminator: String = ))
Array((1,"yyzbPpXEoX"),(2,"bEnzvFCdXm"),(3,"dKXZbgaGTr"),(4,"hIHGkiWjcy"),(5,"HBnJmYlefk"),(6,"MKqfwWCmah"),(7,"CrKYmsbXKI"),(8,"iySnzSKtuH"),(9,"BbCRKqtkml"),(10,"nYdxrDneUm")))

*df: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[0] at
parallelize at :27*:29: error: value _1 is not a member
of Char
   val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF
   ^
:29: error: value _2 is not a member of Char
   val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF

^
:29: error: value toDF is not a member of
org.apache.spark.rdd.RDD[U]
   val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF

^

This works

val df =sc.parallelize(text)

But this fails

val df =sc.parallelize(text).map(p => columns(p._1.toString.toInt,
p._2.toString)).toDF

I gather it sees it at RDD[Char]!


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re:Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-23 Thread prosp4300
The way to use Kryo serializer is similar as Scala, like below, the only 
different is lack of convenient method "conf.registerKryoClasses", but it 
should be easy to make one by yourself

conf=SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.classesToRegister", 
"com.example.YourClassA,com.example.YourClassB")










At 2016-08-23 02:00:41, "Eric Ho"  wrote:

I heard that Kryo will get phased out at some point but not sure which Spark 
release.
I'm using PySpark, does anyone has any docs on how to call / use Kryo 
Serializer in PySpark ?


Thanks.



--



-eric ho



Re:Log rollover in spark streaming jobs

2016-08-23 Thread prosp4300


Spark on Yarn by default support customized log4j configuration, 
RollingFileAppender could be used to avoid disk overflow as documented below

If you need a reference to the proper location to put log files in the YARN so 
that YARN can properly display and aggregate them, use 
spark.yarn.app.container.log.dir in your log4j.properties. For example, 
log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.
 For streaming applications, configuring RollingFileAppender and setting file 
location to YARN’s log directory will avoid disk overflow caused by large log 
files, and logs can be accessed using YARN’s log utility.

You can get more information here:
https://spark.apache.org/docs/latest/running-on-yarn.html#configuration
 








At 2016-08-23 18:44:29, "Pradeep"  wrote:
>Hi All,
>
>I am running Java spark streaming jobs in yarn-client mode. Is there a way I 
>can manage logs rollover on edge node. I have a 10 second batch and log file 
>volume is huge. 
>
>Thanks,
>Pradeep
>
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Log rollover in spark streaming jobs

2016-08-23 Thread Pradeep
Hi All,

I am running Java spark streaming jobs in yarn-client mode. Is there a way I 
can manage logs rollover on edge node. I have a 10 second batch and log file 
volume is huge. 

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming application failing with Token issue

2016-08-23 Thread Jacek Laskowski
Hi Steve,

Could you share your opinion on whether the token gets renewed or not?
Is the token going to expire after 7 days anyway? Why is the change in
the recent version for token renewal? See
https://github.com/apache/spark/commit/ab648c0004cfb20d53554ab333dd2d198cb94ffa

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 23, 2016 at 11:51 AM, Steve Loughran  wrote:
>
> On 21 Aug 2016, at 20:43, Mich Talebzadeh  wrote:
>
> Hi Kamesh,
>
> The message you are getting after 7 days:
>
> PriviledgedActionException as:sys_bio_replicator (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Token has expired
>
> Sounds like an IPC issue with Kerberos Authentication time out. Suggest you
> talk to your Linux admin and also check this link:
>
> https://support.f5.com/kb/en-us/products/big-ip_apm/manuals/product/apm-authentication-single-sign-on-11-5-0/9.print.html
>
>
>
>
> 7 days is one of the big kerberos timeout events; why its best to deploy a
> new system on a tuesday morning.
>
> You are going to need to configure your spark app with a keytab in
> spark.yarn.keytab; see
> http://spark.apache.org/docs/latest/running-on-yarn.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming application failing with Token issue

2016-08-23 Thread Steve Loughran

On 21 Aug 2016, at 20:43, Mich Talebzadeh 
> wrote:

Hi Kamesh,

The message you are getting after 7 days:

PriviledgedActionException as:sys_bio_replicator (auth:KERBEROS) 
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 Token has expired

Sounds like an IPC issue with Kerberos Authentication time out. Suggest you 
talk to your Linux admin and also check this link:

https://support.f5.com/kb/en-us/products/big-ip_apm/manuals/product/apm-authentication-single-sign-on-11-5-0/9.print.html




7 days is one of the big kerberos timeout events; why its best to deploy a new 
system on a tuesday morning.

You are going to need to configure your spark app with a keytab in 
spark.yarn.keytab; see http://spark.apache.org/docs/latest/running-on-yarn.html


Re: Apply ML to grouped dataframe

2016-08-23 Thread ayan guha
I would suggest you to construct a toy problem and post for solution. At
this moment it's a little unclear what your intentions are.

Generally speaking, group by on a data frame created another data frame,
not multiple ones.
On 23 Aug 2016 16:35, "Wen Pei Yu"  wrote:

> Hi Mirmal
>
> Filter works fine if I want handle one of grouped dataframe. But I has
> multiple grouped dataframe, I wish I can apply ML algorithm to all of them
> in one job, but not in for loops.
>
> Wenpei.
>
> [image: Inactive hide details for Nirmal Fernando ---08/23/2016 01:55:46
> PM---On Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu  Fernando ---08/23/2016 01:55:46 PM---On Tue, Aug 23, 2016 at 10:56 AM, Wen
> Pei Yu  wrote: > We can group a dataframe b
>
> From: Nirmal Fernando 
> To: Wen Pei Yu/China/IBM@IBMCN
> Cc: User 
> Date: 08/23/2016 01:55 PM
> Subject: Re: Apply ML to grouped dataframe
> --
>
>
>
>
>
> On Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
> > wrote:
>
>We can group a dataframe by one column like
>
>df.groupBy(df.col("gender"))
>
>
>
> On top of this DF, use a filter that would enable you to extract the
> grouped DF as separated DFs. Then you can apply ML on top of each DF.
>
> eg: xyzDF.filter(col("x").equalTo(x))
>
>
>It like split a dataframe to multiple dataframe. Currently, we can
>only apply simple sql function to this GroupedData like agg, max etc.
>
>What we want is apply one ML algorithm to each group.
>
>Regards.
>
>[image: Inactive hide details for Nirmal Fernando ---08/23/2016
>01:14:48 PM---Hi Wen, AFAIK Spark MLlib implements its machine 
> learning]Nirmal
>Fernando ---08/23/2016 01:14:48 PM---Hi Wen, AFAIK Spark MLlib implements
>its machine learning algorithms on top of
>
>From: Nirmal Fernando <*nir...@wso2.com* >
>To: Wen Pei Yu/China/IBM@IBMCN
>Cc: User <*user@spark.apache.org* >
>Date: 08/23/2016 01:14 PM
>
>
>Subject: Re: Apply ML to grouped dataframe
>--
>
>
>
>Hi Wen,
>
>AFAIK Spark MLlib implements its machine learning algorithms on top of
>Spark dataframe API. What did you mean by a grouped dataframe?
>
>On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
>> wrote:
>   Hi Nirmal
>
>  I didn't get your point.
>  Can you tell me more about how to use MLlib to grouped dataframe?
>
>  Regards.
>  Wenpei.
>
>  [image: Inactive hide details for Nirmal Fernando ---08/23/2016
>  10:26:36 AM---You can use Spark MLlib 
> http://spark.apache.org/docs/late]Nirmal
>  Fernando ---08/23/2016 10:26:36 AM---You can use Spark MLlib
>  
> *http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-bas*
>  
> 
>
>  From: Nirmal Fernando <*nir...@wso2.com* >
>  To: Wen Pei Yu/China/IBM@IBMCN
>  Cc: User <*user@spark.apache.org* >
>  Date: 08/23/2016 10:26 AM
>  Subject: Re: Apply ML to grouped dataframe
>  --
>
>
>
>
>  You can use Spark MLlib
>  
> *http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api*
>  
> 
>
>  On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
>  > wrote:
> Hi
>
>  We have a dataframe, then want group it and apply a
>  ML algorithm or statistics(say t test) to each one. Is 
> there any efficient
>  way for this situation?
>
>  Currently, we transfer to pyspark, use groupbykey
>  and apply numpy function to array. But this wasn't an 
> efficient way, right?
>
>  Regards.
>  Wenpei.
>
>
>
>
>  --
>
>  Thanks & regards,
>  Nirmal
>
>  Team Lead - WSO2 Machine Learner
>  Associate Technical Lead - Data Technologies Team, WSO2 Inc.
>  Mobile: *+94715779733* <%2B94715779733>
>  Blog: *http://nirmalfdo.blogspot.com/*
>  
>
>
>
>
>
>--
>
>Thanks & regards,
>Nirmal
>
>Team Lead - WSO2 Machine Learner
>Associate Technical Lead - Data Technologies Team, WSO2 Inc.
>Mobile: *+94715779733* <%2B94715779733>
>Blog: *http://nirmalfdo.blogspot.com/* 
>
>
>
>
>
>
>
> --
>
> Thanks & regards,
> Nirmal
>
> Team Lead - WSO2 Machine Learner
> Associate Technical Lead - Data 

Re: Spark 2.0 - Join statement compile error

2016-08-23 Thread Mich Talebzadeh
What is   --> s below before the text of sql?

*var* sales_order_sql_stmt =* s*"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID,
ORGANIZATION_ID,

  from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'),
'-MM-dd') AS schedule_date

  FROM sales_order_demand

  WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >= $
planning_start_date  limit 10"""

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 23 August 2016 at 07:31, Deepak Sharma  wrote:

>
> On Tue, Aug 23, 2016 at 10:32 AM, Deepak Sharma 
> wrote:
>
>> *val* *df** = **sales_demand**.**join**(**product_master**,*
>> *sales_demand**.$"INVENTORY_ITEM_ID" =**== **product_master*
>> *.$"INVENTORY_ITEM_ID",**"inner"**)*
>
>
> Ignore the last statement.
> It should look something like this:
> *val* *df** = **sales_demand**.**join**(**product_master**,$"*
> *sales_demand**.INVENTORY_ITEM_ID" =**== $"**product_master*
> *.INVENTORY_ITEM_ID",**"inner"**)*
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Apply ML to grouped dataframe

2016-08-23 Thread Wen Pei Yu

Hi Mirmal

Filter works fine if I want handle one of grouped dataframe. But I has
multiple grouped dataframe, I wish I can apply ML algorithm to all of them
in one job, but not in for loops.

Wenpei.



From:   Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date:   08/23/2016 01:55 PM
Subject:Re: Apply ML to grouped dataframe





On Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu  wrote:
  We can group a dataframe by one column like

  df.groupBy(df.col("gender"))



On top of this DF, use a filter that would enable you to extract the
grouped DF as separated DFs. Then you can apply ML on top of each DF.

eg: xyzDF.filter(col("x").equalTo(x))

  It like split a dataframe to multiple dataframe. Currently, we can only
  apply simple sql function to this GroupedData like agg, max etc.

  What we want is apply one ML algorithm to each group.

  Regards.

  Inactive hide details for Nirmal Fernando ---08/23/2016 01:14:48 PM---Hi
  Wen, AFAIK Spark MLlib implements its machine learningNirmal Fernando
  ---08/23/2016 01:14:48 PM---Hi Wen, AFAIK Spark MLlib implements its
  machine learning algorithms on top of

  From: Nirmal Fernando 
  To: Wen Pei Yu/China/IBM@IBMCN
  Cc: User 
  Date: 08/23/2016 01:14 PM



  Subject: Re: Apply ML to grouped dataframe



  Hi Wen,

  AFAIK Spark MLlib implements its machine learning algorithms on top of
  Spark dataframe API. What did you mean by a grouped dataframe?

  On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu  wrote:
Hi Nirmal

I didn't get your point.
Can you tell me more about how to use MLlib to grouped dataframe?

Regards.
Wenpei.

Inactive hide details for Nirmal Fernando ---08/23/2016 10:26:36
AM---You can use Spark MLlib http://spark.apache.org/docs/late
Nirmal Fernando ---08/23/2016 10:26:36 AM---You can use Spark MLlib

http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-bas


From: Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date: 08/23/2016 10:26 AM
Subject: Re: Apply ML to grouped dataframe




You can use Spark MLlib

http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api


On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu 
wrote:
Hi

We have a dataframe, then want group it and apply a ML
algorithm or statistics(say t test) to each one. Is
there any efficient way for this situation?

Currently, we transfer to pyspark, use groupbykey and
apply numpy function to array. But this wasn't an
efficient way, right?

Regards.
Wenpei.




--

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/






  --

  Thanks & regards,
  Nirmal

  Team Lead - WSO2 Machine Learner
  Associate Technical Lead - Data Technologies Team, WSO2 Inc.
  Mobile: +94715779733
  Blog: http://nirmalfdo.blogspot.com/








--

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/





Re: Spark 2.0 - Join statement compile error

2016-08-23 Thread Deepak Sharma
On Tue, Aug 23, 2016 at 10:32 AM, Deepak Sharma 
wrote:

> *val* *df** = 
> **sales_demand**.**join**(**product_master**,**sales_demand**.$"INVENTORY_ITEM_ID"
> =**== **product_master**.$"INVENTORY_ITEM_ID",**"inner"**)*


Ignore the last statement.
It should look something like this:
*val* *df** = 
**sales_demand**.**join**(**product_master**,$"**sales_demand**.INVENTORY_ITEM_ID"
=**== $"**product_master**.INVENTORY_ITEM_ID",**"inner"**)*


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net