Re: Spark support for Complex Event Processing (CEP)

2016-04-19 Thread Mich Talebzadeh
Thanks a lot Mario. Will have a look.

Regards,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 April 2016 at 06:53, Mario Ds Briggs  wrote:

> Hi Mich,
>
> Info is here - https://issues.apache.org/jira/browse/SPARK-14745
>
> overview is in the pdf -
> https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf
>
> Usage examples not in the best place for now (will make it better) -
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>
> Your feedback is appreciated.
>
>
> thanks
> Mario
>
> [image: Inactive hide details for Mich Talebzadeh ---19/04/2016 12:45:52
> am---great stuff Mario. Much appreciated. Mich]Mich Talebzadeh
> ---19/04/2016 12:45:52 am---great stuff Mario. Much appreciated. Mich
>
> From: Mich Talebzadeh 
> To: Mario Ds Briggs/India/IBM@IBMIN
> Cc: "user @spark" , Luciano Resende <
> luckbr1...@gmail.com>
> Date: 19/04/2016 12:45 am
> Subject: Re: Spark support for Complex Event Processing (CEP)
> --
>
>
>
> great stuff Mario. Much appreciated.
>
> Mich
>
> Dr Mich Talebzadeh
>
> LinkedIn
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
> 
>
> *http://talebzadehmich.wordpress.com*
> 
>
>
>
> On 18 April 2016 at 20:08, Mario Ds Briggs <*mario.bri...@in.ibm.com*
> > wrote:
>
>Hey Mich, Luciano
>
>Will provide links with docs by tomorrow
>
>thanks
>Mario
>
>- Message from Mich Talebzadeh <*mich.talebza...@gmail.com*
>> on Sun, 17 Apr 2016 19:17:38 +0100 -
>
>*To:*
>Luciano Resende <*luckbr1...@gmail.com* >
>
>*cc:*
>"user @spark" <*user@spark.apache.org* >
>
>*Subject:*
>Re: Spark support for Complex Event Processing (CEP)Thanks Luciano.
>Appreciated.
>
>Regards
>
>Dr Mich Talebzadeh
>
>LinkedIn
>
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
> 
>
> *http://talebzadehmich.wordpress.com*
>
>
>
>
>On 17 April 2016 at 17:32, Luciano Resende <*luckbr1...@gmail.com*
>> wrote:
>   Hi Mitch,
>
>  I know some folks that were investigating/prototyping on this
>  area, let me see if I can get them to reply here with more details.
>
>  On Sun, Apr 17, 2016 at 1:54 AM, Mich Talebzadeh <
>  *mich.talebza...@gmail.com* > wrote:
>  Hi,
>
>  Has Spark got libraries for CEP using Spark Streaming with Kafka
>  by any chance?
>
>  I am looking at Flink that supposed to have these libraries for
>  CEP but I find Flink itself very much work in progress.
>
>  Thanks
>
>  Dr Mich Talebzadeh
>
>  LinkedIn
>  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>  
> 
>
> *http://talebzadehmich.wordpress.com*
>  
>
>
>
>
>
>  --
>  Luciano Resende
> *http://twitter.com/lresende1975* 
> *http://lresende.blogspot.com/* 
>
>
>
>
>
>


Re: Spark support for Complex Event Processing (CEP)

2016-04-19 Thread Mario Ds Briggs

Hi Mich,

Info is here - https://issues.apache.org/jira/browse/SPARK-14745

overview is in the pdf -
https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf

Usage examples not in the best place for now (will make it better) -
https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Your feedback is appreciated.


thanks
Mario



From:   Mich Talebzadeh 
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: "user @spark" , Luciano Resende

Date:   19/04/2016 12:45 am
Subject:Re: Spark support for Complex Event Processing (CEP)



great stuff Mario. Much appreciated.

Mich

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com




On 18 April 2016 at 20:08, Mario Ds Briggs  wrote:
  Hey Mich, Luciano

  Will provide links with docs by tomorrow

  thanks
  Mario

  - Message from Mich Talebzadeh  on Sun, 17
  Apr 2016 19:17:38 +0100 -
 
  To: Luciano Resende  
 
  cc: "user @spark"   
 
 Subject: Re: Spark support for Complex Event
  Processing (CEP)   
 

  Thanks Luciano. Appreciated.

  Regards

  Dr Mich Talebzadeh

  LinkedIn
  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


  http://talebzadehmich.wordpress.com




  On 17 April 2016 at 17:32, Luciano Resende  wrote:
Hi Mitch,

I know some folks that were investigating/prototyping on this area,
let me see if I can get them to reply here with more details.

On Sun, Apr 17, 2016 at 1:54 AM, Mich Talebzadeh <
mich.talebza...@gmail.com> wrote:
Hi,

Has Spark got libraries for CEP using Spark Streaming with Kafka by
any chance?

I am looking at Flink that supposed to have these libraries for CEP
but I find Flink itself very much work in progress.

Thanks

Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com






--
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/













Re: Re: Why Spark having OutOfMemory Exception?

2016-04-19 Thread Jeff Zhang
Seems it is OOM in driver side when fetching task result.

You can try to increase spark.driver.memory and spark.driver.maxResultSize

On Tue, Apr 19, 2016 at 4:06 PM, 李明伟  wrote:

> Hi Zhan Zhang
>
>
> Please see the exception trace below. It is saying some GC overhead limit
> error
> I am not a java or scala developer so it is hard for me to understand
> these infor.
> Also reading coredump is too difficult to me..
>
> I am not sure if the way I am using spark is correct. I understand that
> spark can do batch or stream calculation. But my way is to setup a forever
> loop to handle continued income data.
> Not sure if it is the right way to use spark
>
>
> 16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread
> task-result-getter-2
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
> at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
> at
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
> at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at
> org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
> at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> at
> org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
> at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
> Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
> at
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
> at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
> at
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
> at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
> at
> org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
> at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> 

Re: VectorAssembler handling null values

2016-04-19 Thread Nick Pentreath
Could you provide an example of what your input data looks like? Supporting
missing values in a sparse result vector makes sense.
On Tue, 19 Apr 2016 at 23:55, Andres Perez  wrote:

> Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently cannot
> handle null values. This presents a problem for us as we wish to run a
> decision tree classifier on sometimes sparse data. Is there a particular
> reason VectorAssembler is implemented in this way, and can anyone recommend
> the best path for enabling VectorAssembler to build vectors for data that
> will contain empty values?
>
> Thanks!
>
> -Andres
>
>


Re:Re: Why very small work load cause GC overhead limit?

2016-04-19 Thread 李明伟
The memory parameters :--executor-memory 8G --driver-memory 4G. Please note 
that the data size is very small. Total size of the data is less than 10M


As per jmap. It is a little hard for me to do so. I am not a java developer. I 
will google the jmap first, thanks


Regards
Mingwei







At 2016-04-20 11:03:20, "Ted Yu"  wrote:
>Can you tell us the memory parameters you used ?
>
>If you can capture jmap before the GC limit was exceeded, that would give us 
>more clue. 
>
>Thanks
>
>> On Apr 19, 2016, at 7:40 PM, "kramer2...@126.com"  wrote:
>> 
>> Hi All
>> 
>> I use spark doing some calculation. 
>> The situation is 
>> 1. New file will come into a folder periodically
>> 2. I turn the new files into data frame and insert it into an previous data
>> frame.
>> 
>> The code is like below :
>> 
>> 
>># Get the file list in the HDFS directory
>>client = InsecureClient('http://10.79.148.184:50070')
>>file_list = client.list('/test')
>> 
>>df_total = None
>>counter = 0
>>for file in file_list:
>>counter += 1
>> 
>># turn each file (CSV format) into data frame
>>lines = sc.textFile("/test/%s" % file)
>>parts = lines.map(lambda l: l.split(","))
>>rows = parts.map(lambda p: Row(router=p[0], interface=int(p[1]),
>> protocol=p[7],bit=int(p[10])))
>>df = sqlContext.createDataFrame(rows)
>> 
>># do some transform on the data frame
>>df_protocol =
>> df.groupBy(['protocol']).agg(func.sum('bit').alias('bit'))
>> 
>># add the current data frame to previous data frame set
>>if not df_total:
>>df_total = df_protocol
>>else:
>>df_total = df_total.unionAll(df_protocol)
>> 
>># cache the df_total
>>df_total.cache()
>>if counter % 5 == 0:
>>df_total.rdd.checkpoint()
>> 
>># get the df_total information
>>df_total.show()
>> 
>> 
>> I know that as time goes on, the df_total could be big. But actually, before
>> that time come, the above code already raise exception.
>> 
>> When the loop is about 30 loops. The code throw GC overhead limit exceeded
>> exception. The file is very small so even 300 loops the data size could only
>> be about a few MB. I do not know why it throw GC error.
>> 
>> The exception detail is below :
>> 
>>16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread
>> task-result-getter-2
>>java.lang.OutOfMemoryError: GC overhead limit exceeded
>>at
>> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
>>at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
>>at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>>at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>at java.lang.reflect.Method.invoke(Method.java:606)
>>at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>at 
>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>>at
>> org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
>>at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>>at 
>> org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
>>at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>>at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>at java.lang.reflect.Method.invoke(Method.java:606)
>>at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>at
>> org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
>>at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>>at
>> org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
>>at 
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>>at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>>at 

Re: Why very small work load cause GC overhead limit?

2016-04-19 Thread Ted Yu
Can you tell us the memory parameters you used ?

If you can capture jmap before the GC limit was exceeded, that would give us 
more clue. 

Thanks

> On Apr 19, 2016, at 7:40 PM, "kramer2...@126.com"  wrote:
> 
> Hi All
> 
> I use spark doing some calculation. 
> The situation is 
> 1. New file will come into a folder periodically
> 2. I turn the new files into data frame and insert it into an previous data
> frame.
> 
> The code is like below :
> 
> 
># Get the file list in the HDFS directory
>client = InsecureClient('http://10.79.148.184:50070')
>file_list = client.list('/test')
> 
>df_total = None
>counter = 0
>for file in file_list:
>counter += 1
> 
># turn each file (CSV format) into data frame
>lines = sc.textFile("/test/%s" % file)
>parts = lines.map(lambda l: l.split(","))
>rows = parts.map(lambda p: Row(router=p[0], interface=int(p[1]),
> protocol=p[7],bit=int(p[10])))
>df = sqlContext.createDataFrame(rows)
> 
># do some transform on the data frame
>df_protocol =
> df.groupBy(['protocol']).agg(func.sum('bit').alias('bit'))
> 
># add the current data frame to previous data frame set
>if not df_total:
>df_total = df_protocol
>else:
>df_total = df_total.unionAll(df_protocol)
> 
># cache the df_total
>df_total.cache()
>if counter % 5 == 0:
>df_total.rdd.checkpoint()
> 
># get the df_total information
>df_total.show()
> 
> 
> I know that as time goes on, the df_total could be big. But actually, before
> that time come, the above code already raise exception.
> 
> When the loop is about 30 loops. The code throw GC overhead limit exceeded
> exception. The file is very small so even 300 loops the data size could only
> be about a few MB. I do not know why it throw GC error.
> 
> The exception detail is below :
> 
>16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread
> task-result-getter-2
>java.lang.OutOfMemoryError: GC overhead limit exceeded
>at
> scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
>at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
>at
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>at
> org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
>at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>at 
> org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
>at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:606)
>at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>at
> org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
>at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>at
> org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
>at 
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
>Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
>at
> 

Re: How does .jsonFile() work?

2016-04-19 Thread Hyukjin Kwon
Hi,

I hope I understood correctly. This is a simplified procedures.

Precondition

 - JSON file is written line by line. Each is each JSON document.
 - Root array is supported,
eg.
[{...},
{...}
{...}]

Procedures

- Schema inference (If user schema is not given)

 1. Read line JSON document.
 2. Read token by token by Jackson parser (recursively if needed)
 3. Find (or infer) ,from each value , types appropriate with Spark. (eg.
DateType, StringType and DecimalType)
 4. 3. returns a StructType for underlying a single JSON document.
 5. 1.-4. becomes an RDD,  RDD[DataType]
 6. Aggregates each  DataType (StructType normally) into a single
StructType compatible across all the StructTypes.
 7. Use the aggregated StructType as a schema.

- Parse JSON data.

 1. Read line JSON document.
 2. Read token by token by Jackson parser (recursively if needed)
 3. Convert each value to the given type (above).
 4. 3. returns a Row for underlying a single JSON document.

Thanks!


2016-04-20 11:07 GMT+09:00 resonance :

> Hi, this is more of a theoretical question and I'm asking it here because I
> have no idea where to find documentation for this stuff.
>
> I am currently working with Spark SQL and am considering using data
> contained within JSON datasets. I am aware of the .jsonFile() method in
> Spark SQL.
>
> What is the general strategy used by Spark SQL .jsonFile() to parse/decode
> a
> JSON dataset?
>
> (For example, an answer I might be looking for is that the JSON file is
> read
> into an ETL pipeline and transformed into a predefined data structure.)
>
> I am deeply appreciative of any help provided.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-jsonFile-work-tp26802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Why very small work load cause GC overhead limit?

2016-04-19 Thread kramer2...@126.com
Hi All

I use spark doing some calculation. 
The situation is 
1. New file will come into a folder periodically
2. I turn the new files into data frame and insert it into an previous data
frame.

The code is like below :


# Get the file list in the HDFS directory
client = InsecureClient('http://10.79.148.184:50070')
file_list = client.list('/test')

df_total = None
counter = 0
for file in file_list:
counter += 1

# turn each file (CSV format) into data frame
lines = sc.textFile("/test/%s" % file)
parts = lines.map(lambda l: l.split(","))
rows = parts.map(lambda p: Row(router=p[0], interface=int(p[1]),
protocol=p[7],bit=int(p[10])))
df = sqlContext.createDataFrame(rows)

# do some transform on the data frame
df_protocol =
df.groupBy(['protocol']).agg(func.sum('bit').alias('bit'))

# add the current data frame to previous data frame set
if not df_total:
df_total = df_protocol
else:
df_total = df_total.unionAll(df_protocol)

# cache the df_total
df_total.cache()
if counter % 5 == 0:
df_total.rdd.checkpoint()

# get the df_total information
df_total.show()
 

I know that as time goes on, the df_total could be big. But actually, before
that time come, the above code already raise exception.

When the loop is about 30 loops. The code throw GC overhead limit exceeded
exception. The file is very small so even 300 loops the data size could only
be about a few MB. I do not know why it throw GC error.

The exception detail is below :

16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread
task-result-getter-2
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at 
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at 
org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
at 
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at
org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: 
GC
overhead limit exceeded
at
scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at 

How does .jsonFile() work?

2016-04-19 Thread resonance
Hi, this is more of a theoretical question and I'm asking it here because I
have no idea where to find documentation for this stuff. 

I am currently working with Spark SQL and am considering using data
contained within JSON datasets. I am aware of the .jsonFile() method in
Spark SQL.

What is the general strategy used by Spark SQL .jsonFile() to parse/decode a
JSON dataset?

(For example, an answer I might be looking for is that the JSON file is read
into an ETL pipeline and transformed into a predefined data structure.)

I am deeply appreciative of any help provided. 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-jsonFile-work-tp26802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Any NLP lib could be used on spark?

2016-04-19 Thread Burak Yavuz
A quick search on spark-packages returns:
http://spark-packages.org/package/databricks/spark-corenlp.

You may need to build it locally and add it to your session by --jars.

On Tue, Apr 19, 2016 at 10:47 AM, Gavin Yue  wrote:

> Hey,
>
> Want to try the NLP on the spark. Could anyone recommend any easy to run
> NLP open source lib on spark?
>
> Also is there any recommended semantic network?
>
> Thanks a lot.
>


Re: Spark SQL Transaction

2016-04-19 Thread Andrés Ivaldi
I mean local transaction, We've ran a Job that writes into SQLServer then
we killed spark JVM just for testing purpose and we realized that SQLServer
did a rollback.

Regards

On Tue, Apr 19, 2016 at 5:27 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> What do you mean by *without transaction*? do you mean forcing SQL Server
> to accept a non logged operation?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 19 April 2016 at 21:18, Andrés Ivaldi  wrote:
>
>> Hello, is possible to execute a SQL write without Transaction? we dont
>> need transactions to save our data and this adds an overhead to the
>> SQLServer.
>>
>> Regards.
>>
>> --
>> Ing. Ivaldi Andres
>>
>
>


-- 
Ing. Ivaldi Andres


Spark 1.6.1 DataFrame write to JDBC

2016-04-19 Thread Jonathan Gray
Hi,

I'm trying to write ~60 million rows from a DataFrame to a database using
JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)

The write seems to not be performing well.  Profiling the application with
a master of local[*] it appears there is not much socket write activity and
also not much CPU.

I would expect there to be an almost continuous block of socket write
activity showing up somewhere in the profile.

I can see that the top hot method involves
apache.spark.unsafe.platform.CopyMemory all from calls within
JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
stressed so I'm guessing this isn't the cause of the problem.

Is there any best practices or has anyone come across a case like this
before where a write to a database seems to perform poorly?

Thanks,
Jon


VectorAssembler handling null values

2016-04-19 Thread Andres Perez
Hi everyone. org.apache.spark.ml.feature.VectorAssembler currently cannot
handle null values. This presents a problem for us as we wish to run a
decision tree classifier on sometimes sparse data. Is there a particular
reason VectorAssembler is implemented in this way, and can anyone recommend
the best path for enabling VectorAssembler to build vectors for data that
will contain empty values?

Thanks!

-Andres


Re: Spark SQL Transaction

2016-04-19 Thread Mich Talebzadeh
Hi,

What do you mean by *without transaction*? do you mean forcing SQL Server
to accept a non logged operation?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 19 April 2016 at 21:18, Andrés Ivaldi  wrote:

> Hello, is possible to execute a SQL write without Transaction? we dont
> need transactions to save our data and this adds an overhead to the
> SQLServer.
>
> Regards.
>
> --
> Ing. Ivaldi Andres
>


Spark SQL Transaction

2016-04-19 Thread Andrés Ivaldi
Hello, is possible to execute a SQL write without Transaction? we dont need
transactions to save our data and this adds an overhead to the SQLServer.

Regards.

-- 
Ing. Ivaldi Andres


how to get weights of logistic regression model inside cross validator model?

2016-04-19 Thread Wei Chen
Hi All,

I am using the example of model selection via cross-validation from the
documentation here: http://spark.apache.org/docs/latest/ml-guide.html.
After I get the "cvModel", I would like to see the weights for each feature
for the best logistic regression model. I've been looking at the methods
and attributes of this "cvModel" and "cvModel.bestModel" and still can't
figure out where these weights are referred. It must be somewhere since we
can use "cvModel" to transform a new dataframe. Your help is much
appreciated.


Thank you,
Wei


pyspark split pair rdd to multiple

2016-04-19 Thread pth001

Hi,

How can I split pair rdd [K, V] to map [K, Array(V)] efficiently in Pyspark?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
Let me be more detailed in my response:

Kafka works on “at least once” semantics. Therefore, given your assumption that 
Kafka "will be operational", we can assume that at least once semantics will 
hold.

At this point, it comes down to designing for consumer (really Spark Executor) 
resilience.

From a DC standpoint, you can use an in memory data fabric, like is provided by 
InsightEdge, http://insightedge.io/docs/010/index.html 
. In this case, WAN replication out 
to other DCs is available at the Data Grid layer. See here: 
http://www.gigaspaces.com/Data-Replication 
. 

Assuming that the consumers respect at least once semantics (that is: don’t 
attempt to keep track of the offset or any other state), then Spark can 
parallelize execution using Dumb Consumers. The backing data fabric can do what 
it does best, which is conflict resolution in the case that a DC goes down for 
a period of time.

One advantage of this architecture is that it can be used to load balance, 
reducing infrastructure costs.

Of course, the CAP theorem is still in play, so things like intra-DC latencies 
and consistency SLAs need to be considered. But in principle, you can balance 
competing concerns against one another based on business requirements.

HTH

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN  wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  > wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN  > wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin  > wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger > > wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN > > wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1
>> Worker 1.2   my_groupP2
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like, in case of DC crash, a rebalancing of partition on the healthy 
>> DC, something as follow
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1, P3

Any NLP lib could be used on spark?

2016-04-19 Thread Gavin Yue
Hey,

Want to try the NLP on the spark. Could anyone recommend any easy to run
NLP open source lib on spark?

Also is there any recommended semantic network?

Thanks a lot.


Re: prefix column Spark

2016-04-19 Thread Michael Armbrust
A few comments:
 - Each withColumnRename is adding a new level to the logical plan.  We
have optimized this significantly in newer versions of Spark, but it is
still not free.
 - Transforming to an RDD is going to do fairly expensive conversion back
and forth between the internal binary format.
 - Probably the best way to accomplish this is to build up all the new
columns you want and pass them to a single select call.


On Tue, Apr 19, 2016 at 3:04 AM, nihed mbarek  wrote:

> Hi
> thank you, it's the first solution and it took a long time to manage all
> my fields
>
> Regards,
>
> On Tue, Apr 19, 2016 at 11:29 AM, Ndjido Ardo BAR 
> wrote:
>
>>
>> This can help:
>>
>> import org.apache.spark.sql.DataFrame
>>
>> def prefixDf(dataFrame: DataFrame, prefix: String): DataFrame = {
>>   val colNames = dataFrame.columns
>>   colNames.foldLeft(dataFrame){
>> (df, colName) => {
>>   df.withColumnRenamed(colName, s"${prefix}_${colName}")
>> }
>> }
>> }
>>
>> cheers,
>> Ardo
>>
>>
>> On Tue, Apr 19, 2016 at 10:53 AM, nihed mbarek  wrote:
>>
>>> Hi,
>>>
>>> I want to prefix a set of dataframes and I try two solutions:
>>> * A for loop calling withColumnRename based on columns()
>>> * transforming my Dataframe to and RDD, updating the old schema and
>>> recreating the dataframe.
>>>
>>>
>>> both are working for me, the second one is faster with tables that
>>> contain 800 columns but have a more stage of transformation toRDD.
>>>
>>> Is there any other solution?
>>>
>>> Thank you
>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> 
>>>
>>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
At that scale, it’s best not to do coordination at the application layer. 

How much of your data is transactional in nature {all, some, none}? By which I 
mean ACID-compliant.

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN  wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  > wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN  > wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin  > wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger > > wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN > > wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1
>> Worker 1.2   my_groupP2
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like, in case of DC crash, a rebalancing of partition on the healthy 
>> DC, something as follow
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1, P3
>> Worker 1.2   my_groupP2, P4
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate WAL.
>> 
>> Hope the explanation is better !
>> 
>> 
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger > > wrote:
>> The current direct stream only handles exactly the partitions
>> specified at startup.  You'd have to restart the job if you changed
>> partitions.
>> 
>> https://issues.apache.org/jira/browse/SPARK-12177 
>>  has the ongoing work
>> towards using the kafka 0.10 consumer, which would allow for dynamic
>> topicparittions
>> 
>> Regarding your multi-DC questions, I'm not really clear on what you're 
>> saying.
>> 
>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN > > wrote:
>> > Hello,
>> >
>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>> > datacenters) share the same 

Re: Spark streaming batch time displayed is not current system time but it is processing current messages

2016-04-19 Thread Ted Yu
Using
http://www.ruddwire.com/handy-code/date-to-millisecond-calculators/#.VxZh3iMrKuo
, 1460823008000 is shown to be 'Sat Apr 16 2016 09:10:08 GMT-0700'

Can you clarify the 4 day difference ?

bq. 'right now April 14th'

The date of your email was Apr 16th.

On Sat, Apr 16, 2016 at 9:39 AM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Can anyone help me in debugging  this issue please.
>
>
> On Thu, Apr 14, 2016 at 12:24 PM, Hemalatha A <
> hemalatha.amru...@googlemail.com> wrote:
>
>> Hi,
>>
>> I am facing a problem in Spark streaming.
>>
>
>
> Time: 1460823006000 ms
> ---
>
> ---
> Time: 1460823008000 ms
> ---
>
>
>
>
>> The time displayed in Spark streaming console as above is 4 days prior
>> i.e.,  April 10th, which is not current system time of the cluster  but the
>> job is processing current messages that is pushed right now April 14th.
>>
>> Can anyone please advice what time does Spark streaming display? Also,
>> when there  is scheduling delay of say 8 hours, what time does Spark
>> display- current rime or   hours behind?
>>
>> --
>>
>>
>> Regards
>> Hemalatha
>>
>
>
>
> --
>
>
> Regards
> Hemalatha
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Cody, you're right that was an example. Target architecture would be 3 DCs
:) Good point on ZK, I'll have to check that.

About Spark, both instances will run at the same time but on different
topics. That would be quite useless to have to 2DCs working on the same set
of data.
I just want, in case of crash, that the healthy spark works on all topics
(retrieve dead spark load).

Does it seem an awkward design ?

On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  wrote:

> Maybe I'm missing something, but I don't see how you get a quorum in only
> 2 datacenters (without splitbrain problem, etc).  I also don't know how
> well ZK will work cross-datacenter.
>
> As far as the spark side of things goes, if it's idempotent, why not just
> run both instances all the time.
>
>
>
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN 
> wrote:
>
>> I'm describing a disaster recovery but it can be used to make one
>> datacenter offline for upgrade for instance.
>>
>> From my point of view when DC2 crashes:
>>
>> *On Kafka side:*
>> - kafka cluster will lose one or more broker (partition leader and
>> replica)
>> - partition leader lost will be reelected in the remaining healthy DC
>>
>> => if the number of in-sync replicas are above the minimum threshold,
>> kafka should be operational
>>
>> *On downstream datastore side (say Cassandra for instance):*
>> - deploy accross the 2 DCs in (QUORUM / QUORUM)
>> - idempotent write
>>
>> => it should be ok (depends on replication factor)
>>
>> *On Spark*:
>> - treatment should be idempotent, it will allow us to restart from the
>> last commited offset
>>
>> I understand that starting up a post crash job would work.
>>
>> Question is: how can we detect when DC2 crashes to start a new job ?
>>
>> dynamic topic partition (at each kafkaRDD creation for instance) + topic
>> subscription may be the answer ?
>>
>> I appreciate your effort.
>>
>> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
>> wrote:
>>
>>> It the main concern uptime or disaster recovery?
>>>
>>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>>>
>>> I think the bigger question is what happens to Kafka and your downstream
>>> data store when DC2 crashes.
>>>
>>> From a Spark point of view, starting up a post-crash job in a new data
>>> center isn't really different from starting up a post-crash job in the
>>> original data center.
>>>
>>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
>>> wrote:
>>>
 Thanks Jason and Cody. I'll try to explain a bit better the Multi DC
 case.

 As I mentionned before, I'm planning to use one kafka cluster and 2 or
 more spark cluster distinct.

 Let's say we have the following DCs configuration in a nominal case.
 Kafka partitions are consumed uniformly by the 2 datacenters.

 DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
 DC 1 Master 1.1

 Worker 1.1 my_group P1
 Worker 1.2 my_group P2
 DC 2 Master 2.1

 Worker 2.1 my_group P3
 Worker 2.2 my_group P4
 I would like, in case of DC crash, a rebalancing of partition on the
 healthy DC, something as follow

 DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
 DC 1 Master 1.1

 Worker 1.1 my_group P1*, P3*
 Worker 1.2 my_group P2*, P4*
 DC 2 Master 2.1

 Worker 2.1 my_group P3
 Worker 2.2 my_group P4

 I would like to know if it's possible:
 - using consumer group ?
 - using direct approach ? I prefer this one as I don't want to activate
 WAL.

 Hope the explanation is better !


 On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
 wrote:

> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
>
> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
>
> Regarding your multi-DC questions, I'm not really clear on what you're
> saying.
>
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN <
> eallain.po...@gmail.com> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group
> mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which
> does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one
> spark. Is
> > it possible to 

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Cody Koeninger
Maybe I'm missing something, but I don't see how you get a quorum in only 2
datacenters (without splitbrain problem, etc).  I also don't know how well
ZK will work cross-datacenter.

As far as the spark side of things goes, if it's idempotent, why not just
run both instances all the time.



On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN 
wrote:

> I'm describing a disaster recovery but it can be used to make one
> datacenter offline for upgrade for instance.
>
> From my point of view when DC2 crashes:
>
> *On Kafka side:*
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
>
> => if the number of in-sync replicas are above the minimum threshold,
> kafka should be operational
>
> *On downstream datastore side (say Cassandra for instance):*
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
>
> => it should be ok (depends on replication factor)
>
> *On Spark*:
> - treatment should be idempotent, it will allow us to restart from the
> last commited offset
>
> I understand that starting up a post crash job would work.
>
> Question is: how can we detect when DC2 crashes to start a new job ?
>
> dynamic topic partition (at each kafkaRDD creation for instance) + topic
> subscription may be the answer ?
>
> I appreciate your effort.
>
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
> wrote:
>
>> It the main concern uptime or disaster recovery?
>>
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>>
>> I think the bigger question is what happens to Kafka and your downstream
>> data store when DC2 crashes.
>>
>> From a Spark point of view, starting up a post-crash job in a new data
>> center isn't really different from starting up a post-crash job in the
>> original data center.
>>
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
>> wrote:
>>
>>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC
>>> case.
>>>
>>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>>> more spark cluster distinct.
>>>
>>> Let's say we have the following DCs configuration in a nominal case.
>>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>>
>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>> DC 1 Master 1.1
>>>
>>> Worker 1.1 my_group P1
>>> Worker 1.2 my_group P2
>>> DC 2 Master 2.1
>>>
>>> Worker 2.1 my_group P3
>>> Worker 2.2 my_group P4
>>> I would like, in case of DC crash, a rebalancing of partition on the
>>> healthy DC, something as follow
>>>
>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>> DC 1 Master 1.1
>>>
>>> Worker 1.1 my_group P1*, P3*
>>> Worker 1.2 my_group P2*, P4*
>>> DC 2 Master 2.1
>>>
>>> Worker 2.1 my_group P3
>>> Worker 2.2 my_group P4
>>>
>>> I would like to know if it's possible:
>>> - using consumer group ?
>>> - using direct approach ? I prefer this one as I don't want to activate
>>> WAL.
>>>
>>> Hope the explanation is better !
>>>
>>>
>>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
>>> wrote:
>>>
 The current direct stream only handles exactly the partitions
 specified at startup.  You'd have to restart the job if you changed
 partitions.

 https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
 towards using the kafka 0.10 consumer, which would allow for dynamic
 topicparittions

 Regarding your multi-DC questions, I'm not really clear on what you're
 saying.

 On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
 wrote:
 > Hello,
 >
 > I'm currently designing a solution where 2 distinct clusters Spark (2
 > datacenters) share the same Kafka (Kafka rack aware or manual broker
 > repartition).
 > The aims are
 > - preventing DC crash: using kafka resiliency and consumer group
 mechanism
 > (or else ?)
 > - keeping consistent offset among replica (vs mirror maker,which does
 not
 > keep offset)
 >
 > I have several questions
 >
 > 1) Dynamic repartition (one or 2 DC)
 >
 > I'm using KafkaDirectStream which map one partition kafka with one
 spark. Is
 > it possible to handle new or removed partition ?
 > In the compute method, it looks like we are always using the
 currentOffset
 > map to query the next batch and therefore it's always the same number
 of
 > partition ? Can we request metadata at each batch ?
 >
 > 2) Multi DC Spark
 >
 > Using Direct approach, a way to achieve this would be
 > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
 > - only one is reading the partition (Check every x interval, "lock"
 stored
 > in cassandra for instance)
 >
 > => not sure if it works just an idea
 >
 > Using Consumer Group

Re: Cached Parquet file paths problem

2016-04-19 Thread Piotr Smoliński
Solved it.

The anonymous RDDs can be cached in the cacheManager in SQLContext.

In order to remove all the cached content use:
  sqlContext.clearCache()

The warning symptom about failed data frame registration is the following
entry in the log:

16/04/16 20:18:39 [tp439928219-110]  WARN CacheManager: Asked to cache
already cached data.


Piotr

On Wed, Mar 30, 2016 at 3:10 PM, psmolinski 
wrote:

> bumping up the topic.
>
> For the moment I stay with 1.5.2, but I would like to switch to 1.6.x and
> this issue is a blocker.
>
> Thanks,
> Piotr
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cached-Parquet-file-paths-problem-tp26576p26637.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
I'm describing a disaster recovery but it can be used to make one
datacenter offline for upgrade for instance.

>From my point of view when DC2 crashes:

*On Kafka side:*
- kafka cluster will lose one or more broker (partition leader and replica)
- partition leader lost will be reelected in the remaining healthy DC

=> if the number of in-sync replicas are above the minimum threshold, kafka
should be operational

*On downstream datastore side (say Cassandra for instance):*
- deploy accross the 2 DCs in (QUORUM / QUORUM)
- idempotent write

=> it should be ok (depends on replication factor)

*On Spark*:
- treatment should be idempotent, it will allow us to restart from the last
commited offset

I understand that starting up a post crash job would work.

Question is: how can we detect when DC2 crashes to start a new job ?

dynamic topic partition (at each kafkaRDD creation for instance) + topic
subscription may be the answer ?

I appreciate your effort.

On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
wrote:

> It the main concern uptime or disaster recovery?
>
> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>
> I think the bigger question is what happens to Kafka and your downstream
> data store when DC2 crashes.
>
> From a Spark point of view, starting up a post-crash job in a new data
> center isn't really different from starting up a post-crash job in the
> original data center.
>
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
> wrote:
>
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>>
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>> more spark cluster distinct.
>>
>> Let's say we have the following DCs configuration in a nominal case.
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1
>> Worker 1.2 my_group P2
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>> I would like, in case of DC crash, a rebalancing of partition on the
>> healthy DC, something as follow
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1*, P3*
>> Worker 1.2 my_group P2*, P4*
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>>
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate
>> WAL.
>>
>> Hope the explanation is better !
>>
>>
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
>> wrote:
>>
>>> The current direct stream only handles exactly the partitions
>>> specified at startup.  You'd have to restart the job if you changed
>>> partitions.
>>>
>>> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
>>> towards using the kafka 0.10 consumer, which would allow for dynamic
>>> topicparittions
>>>
>>> Regarding your multi-DC questions, I'm not really clear on what you're
>>> saying.
>>>
>>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
>>> wrote:
>>> > Hello,
>>> >
>>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>>> > repartition).
>>> > The aims are
>>> > - preventing DC crash: using kafka resiliency and consumer group
>>> mechanism
>>> > (or else ?)
>>> > - keeping consistent offset among replica (vs mirror maker,which does
>>> not
>>> > keep offset)
>>> >
>>> > I have several questions
>>> >
>>> > 1) Dynamic repartition (one or 2 DC)
>>> >
>>> > I'm using KafkaDirectStream which map one partition kafka with one
>>> spark. Is
>>> > it possible to handle new or removed partition ?
>>> > In the compute method, it looks like we are always using the
>>> currentOffset
>>> > map to query the next batch and therefore it's always the same number
>>> of
>>> > partition ? Can we request metadata at each batch ?
>>> >
>>> > 2) Multi DC Spark
>>> >
>>> > Using Direct approach, a way to achieve this would be
>>> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
>>> > - only one is reading the partition (Check every x interval, "lock"
>>> stored
>>> > in cassandra for instance)
>>> >
>>> > => not sure if it works just an idea
>>> >
>>> > Using Consumer Group
>>> > - CommitOffset manually at the end of the batch
>>> >
>>> > => Does spark handle partition rebalancing ?
>>> >
>>> > I'd appreciate any ideas ! Let me know if it's not clear.
>>> >
>>> > Erwan
>>> >
>>> >
>>>
>>
>>
>
>


Re: hbaseAdmin tableExists create catalogTracker for every call

2016-04-19 Thread Ted Yu
The CatalogTracker object may not be used by all the methods of HBaseAdmin.

Meaning, when HBaseAdmin is constructed, we don't need CatalogTracker.

On Tue, Apr 19, 2016 at 6:09 AM, WangYQ  wrote:

> in hbase 0.98.10,  class   "HBaseAdmin "
> line  303,  method  "tableExists",   will create a catalogTracker for
> every call
>
>
> we can let a HBaseAdmin object use one CatalogTracker object, to reduce
> the object create, connect zk and so on
>
>
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
It the main concern uptime or disaster recovery?

> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
> 
> I think the bigger question is what happens to Kafka and your downstream data 
> store when DC2 crashes.
> 
> From a Spark point of view, starting up a post-crash job in a new data center 
> isn't really different from starting up a post-crash job in the original data 
> center.
> 
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN  > wrote:
> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
> 
> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
> spark cluster distinct. 
> 
> Let's say we have the following DCs configuration in a nominal case. 
> Kafka partitions are consumed uniformly by the 2 datacenters.
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1
> Worker 1.2my_groupP2
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like, in case of DC crash, a rebalancing of partition on the healthy 
> DC, something as follow
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1, P3
> Worker 1.2my_groupP2, P4
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like to know if it's possible:
> - using consumer group ?
> - using direct approach ? I prefer this one as I don't want to activate WAL.
> 
> Hope the explanation is better !
> 
> 
> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger  > wrote:
> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
> 
> https://issues.apache.org/jira/browse/SPARK-12177 
>  has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
> 
> Regarding your multi-DC questions, I'm not really clear on what you're saying.
> 
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN  > wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock" stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
> 
> 



Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Cody Koeninger
I think the bigger question is what happens to Kafka and your downstream
data store when DC2 crashes.

>From a Spark point of view, starting up a post-crash job in a new data
center isn't really different from starting up a post-crash job in the
original data center.

On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
wrote:

> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>
> As I mentionned before, I'm planning to use one kafka cluster and 2 or
> more spark cluster distinct.
>
> Let's say we have the following DCs configuration in a nominal case.
> Kafka partitions are consumed uniformly by the 2 datacenters.
>
> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
> DC 1 Master 1.1
>
> Worker 1.1 my_group P1
> Worker 1.2 my_group P2
> DC 2 Master 2.1
>
> Worker 2.1 my_group P3
> Worker 2.2 my_group P4
> I would like, in case of DC crash, a rebalancing of partition on the
> healthy DC, something as follow
>
> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
> DC 1 Master 1.1
>
> Worker 1.1 my_group P1*, P3*
> Worker 1.2 my_group P2*, P4*
> DC 2 Master 2.1
>
> Worker 2.1 my_group P3
> Worker 2.2 my_group P4
>
> I would like to know if it's possible:
> - using consumer group ?
> - using direct approach ? I prefer this one as I don't want to activate
> WAL.
>
> Hope the explanation is better !
>
>
> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
> wrote:
>
>> The current direct stream only handles exactly the partitions
>> specified at startup.  You'd have to restart the job if you changed
>> partitions.
>>
>> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
>> towards using the kafka 0.10 consumer, which would allow for dynamic
>> topicparittions
>>
>> Regarding your multi-DC questions, I'm not really clear on what you're
>> saying.
>>
>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
>> wrote:
>> > Hello,
>> >
>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>> > repartition).
>> > The aims are
>> > - preventing DC crash: using kafka resiliency and consumer group
>> mechanism
>> > (or else ?)
>> > - keeping consistent offset among replica (vs mirror maker,which does
>> not
>> > keep offset)
>> >
>> > I have several questions
>> >
>> > 1) Dynamic repartition (one or 2 DC)
>> >
>> > I'm using KafkaDirectStream which map one partition kafka with one
>> spark. Is
>> > it possible to handle new or removed partition ?
>> > In the compute method, it looks like we are always using the
>> currentOffset
>> > map to query the next batch and therefore it's always the same number of
>> > partition ? Can we request metadata at each batch ?
>> >
>> > 2) Multi DC Spark
>> >
>> > Using Direct approach, a way to achieve this would be
>> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
>> > - only one is reading the partition (Check every x interval, "lock"
>> stored
>> > in cassandra for instance)
>> >
>> > => not sure if it works just an idea
>> >
>> > Using Consumer Group
>> > - CommitOffset manually at the end of the batch
>> >
>> > => Does spark handle partition rebalancing ?
>> >
>> > I'd appreciate any ideas ! Let me know if it's not clear.
>> >
>> > Erwan
>> >
>> >
>>
>
>


Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Arkadiusz Bicz
Sorry, I've found one error:

If you do NOT need any relational processing of your messages ( basing on
historical data, or joining with other messages) and message
processing is quite independent Kafka plus Spark Streaming could be
overkill.

On Tue, Apr 19, 2016 at 1:54 PM, Arkadiusz Bicz
 wrote:
> Requirements looks like my previous project for smart metering. We
> finally did custom solution without Spark, Hadoop and Kafka but it was
> 4 years ago when I did not have experience with this technologies (
> some not existed or were not mature).
>
> If you do need any relational processing of your messages ( basing on
> historical data, or joining with other messages) and message
> processing is quite independent Kafka plus Spark Streaming could be
> overkill.
>
> The best to check if your data has natural index like timestamp in
> metering data which come in the same frequency (every second) and
> basing on it do access to your cache and disc. For cache for me  most
> promising looks  Alluxio.
>
> BR,
> Arkadiusz Bicz
>
> On Tue, Apr 19, 2016 at 6:01 AM, Deepak Sharma  wrote:
>> Hi all,
>> I am looking for an architecture to ingest 10 mils of messages in the micro
>> batches of seconds.
>> If anyone has worked on similar kind of architecture  , can you please point
>> me to any documentation around the same like what should be the architecture
>> , which all components/big data ecosystem tools should i consider etc.
>> The messages has to be in xml/json format , a preprocessor engine or message
>> enhancer and then finally a processor.
>> I thought about using data cache as well for serving the data
>> The data cache should have the capability to serve the historical  data in
>> milliseconds (may be upto 30 days of data)
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Arkadiusz Bicz
Requirements looks like my previous project for smart metering. We
finally did custom solution without Spark, Hadoop and Kafka but it was
4 years ago when I did not have experience with this technologies (
some not existed or were not mature).

If you do need any relational processing of your messages ( basing on
historical data, or joining with other messages) and message
processing is quite independent Kafka plus Spark Streaming could be
overkill.

The best to check if your data has natural index like timestamp in
metering data which come in the same frequency (every second) and
basing on it do access to your cache and disc. For cache for me  most
promising looks  Alluxio.

BR,
Arkadiusz Bicz

On Tue, Apr 19, 2016 at 6:01 AM, Deepak Sharma  wrote:
> Hi all,
> I am looking for an architecture to ingest 10 mils of messages in the micro
> batches of seconds.
> If anyone has worked on similar kind of architecture  , can you please point
> me to any documentation around the same like what should be the architecture
> , which all components/big data ecosystem tools should i consider etc.
> The messages has to be in xml/json format , a preprocessor engine or message
> enhancer and then finally a processor.
> I thought about using data cache as well for serving the data
> The data cache should have the capability to serve the historical  data in
> milliseconds (may be upto 30 days of data)
> --
> Thanks
> Deepak
> www.bigdatabig.com
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming batch time displayed is not current system time but it is processing current messages

2016-04-19 Thread Prashant Sharma
This can happen if system time is not in sync. By default, streaming uses
SystemClock(it also supports ManualClock) and that relies
on System.currentTimeMillis() for determining start time.

Prashant Sharma



On Sat, Apr 16, 2016 at 10:09 PM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> Can anyone help me in debugging  this issue please.
>
>
> On Thu, Apr 14, 2016 at 12:24 PM, Hemalatha A <
> hemalatha.amru...@googlemail.com> wrote:
>
>> Hi,
>>
>> I am facing a problem in Spark streaming.
>>
>
>
> Time: 1460823006000 ms
> ---
>
> ---
> Time: 1460823008000 ms
> ---
>
>
>
>
>> The time displayed in Spark streaming console as above is 4 days prior
>> i.e.,  April 10th, which is not current system time of the cluster  but the
>> job is processing current messages that is pushed right now April 14th.
>>
>> Can anyone please advice what time does Spark streaming display? Also,
>> when there  is scheduling delay of say 8 hours, what time does Spark
>> display- current rime or   hours behind?
>>
>> --
>>
>>
>> Regards
>> Hemalatha
>>
>
>
>
> --
>
>
> Regards
> Hemalatha
>


Exceeding spark.akka.frameSize when saving Word2VecModel

2016-04-19 Thread Stefan Falk

Hello Sparklings!

I am trying to train a word vector model but as I call 
Word2VecModel#save() I am getting a org.apache.spark.SparkException 
saying that this would exceed the frameSize limit (stackoverflow 
question [1]).


Increasing the frameSize would only help me in this particular case I 
guess, but as soon as the model exceeds 2GB the problem is going to 
occur again.


I am not aware of my options here - what can I do to save a 
Word2VecModel of arbitrary size?


Thanks for any help!

BR; Stefan

[1] 
http://stackoverflow.com/questions/36692386/exceeding-spark-akka-framesize-when-saving-word2vecmodel


--
Stefan R. Falk

Know-Center Graz
Inffeldgasse 13 / 6. Stock
8010 Graz, Austria
Email : sf...@know-center.at
Tel: +43 316 873 30869
http://www.know-center.at



Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Jörn Franke
I do not think there is a simple how to for this. First you need to be clear of 
volumes in storage, in-transit and in-processing. Then you need to be aware of 
what kind of queries you want to do. Your assumption of milliseconds for he 
expected data volumes currently seem to be unrealistic. However you need to 
provide much more information on where the data comes from, what you need to do 
etc.

Another thing: please do not use xml or json for big data. You waste hardware 
resources , time and harm the environment.

> On 19 Apr 2016, at 07:01, Deepak Sharma  wrote:
> 
> Hi all,
> I am looking for an architecture to ingest 10 mils of messages in the micro 
> batches of seconds.
> If anyone has worked on similar kind of architecture  , can you please point 
> me to any documentation around the same like what should be the architecture 
> , which all components/big data ecosystem tools should i consider etc.
> The messages has to be in xml/json format , a preprocessor engine or message 
> enhancer and then finally a processor.
> I thought about using data cache as well for serving the data 
> The data cache should have the capability to serve the historical  data in 
> milliseconds (may be upto 30 days of data)
> -- 
> Thanks
> Deepak
> www.bigdatabig.com
> 


Re: prefix column Spark

2016-04-19 Thread nihed mbarek
Hi
thank you, it's the first solution and it took a long time to manage all my
fields

Regards,

On Tue, Apr 19, 2016 at 11:29 AM, Ndjido Ardo BAR  wrote:

>
> This can help:
>
> import org.apache.spark.sql.DataFrame
>
> def prefixDf(dataFrame: DataFrame, prefix: String): DataFrame = {
>   val colNames = dataFrame.columns
>   colNames.foldLeft(dataFrame){
> (df, colName) => {
>   df.withColumnRenamed(colName, s"${prefix}_${colName}")
> }
> }
> }
>
> cheers,
> Ardo
>
>
> On Tue, Apr 19, 2016 at 10:53 AM, nihed mbarek  wrote:
>
>> Hi,
>>
>> I want to prefix a set of dataframes and I try two solutions:
>> * A for loop calling withColumnRename based on columns()
>> * transforming my Dataframe to and RDD, updating the old schema and
>> recreating the dataframe.
>>
>>
>> both are working for me, the second one is faster with tables that
>> contain 800 columns but have a more stage of transformation toRDD.
>>
>> Is there any other solution?
>>
>> Thank you
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>


-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




RE: Spark + HDFS

2016-04-19 Thread Ashic Mahtab
Spark will execute as a client for hdfs. In other words, it'll contact the 
hadoop master for the hdfs cluster, which will return the block info, and then 
the data will be fetched from the data nodes.

Date: Tue, 19 Apr 2016 14:00:31 +0530
Subject: Spark + HDFS
From: chaturvedich...@gmail.com
To: user@spark.apache.org

When I use spark and hdfs on two different clusters.How does spark workers know 
that which block of data is available in which hdfs node. Who basically caters 
to this.
Can someone throw light on this.  

SQL Driver

2016-04-19 Thread AlexModestov
Hello all,
I use a string when I'm launching the Sparkling-Water:
"--conf
spark.driver.extraClassPath='/SQLDrivers/sqljdbc_4.2/enu/sqljdbc41.jar"
and I get the error:
"
---
TypeError Traceback (most recent call last)
 in ()
  1 from pysparkling import *
> 2 hc = H2OContext(sc).start()

/tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py
in __init__(self, sparkContext)
 70 def __init__(self, sparkContext):
 71 try:
---> 72 self._do_init(sparkContext)
 73 # Hack H2OFrame from h2o package
 74 _monkey_patch_H2OFrame(self)

/tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py
in _do_init(self, sparkContext)
 94 gw = self._gw
 95 
---> 96 self._jhc =
jvm.org.apache.spark.h2o.H2OContext.getOrCreate(sc._jsc)
 97 self._client_ip = None
 98 self._client_port = None

TypeError: 'JavaPackage' object is not callable"
What does it mean?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Driver-tp26800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: prefix column Spark

2016-04-19 Thread Ndjido Ardo BAR
This can help:

import org.apache.spark.sql.DataFrame

def prefixDf(dataFrame: DataFrame, prefix: String): DataFrame = {
  val colNames = dataFrame.columns
  colNames.foldLeft(dataFrame){
(df, colName) => {
  df.withColumnRenamed(colName, s"${prefix}_${colName}")
}
}
}

cheers,
Ardo


On Tue, Apr 19, 2016 at 10:53 AM, nihed mbarek  wrote:

> Hi,
>
> I want to prefix a set of dataframes and I try two solutions:
> * A for loop calling withColumnRename based on columns()
> * transforming my Dataframe to and RDD, updating the old schema and
> recreating the dataframe.
>
>
> both are working for me, the second one is faster with tables that contain
> 800 columns but have a more stage of transformation toRDD.
>
> Is there any other solution?
>
> Thank you
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> 
>
>


Re: spark sql on hive

2016-04-19 Thread Mich Talebzadeh
This is not a bug of Hive. Spark uses hive-site.xml to get the location of
Hive metastore.

You cannot connect directly to Hive metastore and interrogate metastore
directly. You will need to know the metastore schema.

  
hive.metastore.uris
thrift://:9083
Thrift URI for the remote metastore. Used by metastore
client to connect to remote metastore.
  

That port 9083 (default port) requires up and running via

$HIVE_HOME/bin/hive --service metastore &

before any connection to Hive metstore can be established

Login   OS Proc/ID   Client Proc/ID   SID   SER#
HOST   PROGRAMLogged/Hours
---   - -
-- -- 
HIVEUSERoracle/11971 hduser/1234  245   30286
rhes564JDBC Thin Client  1
HIVEUSERoracle/11973 hduser/1234  284   14712
rhes564JDBC Thin Client  1
HIVEUSERoracle/15394 hduser/1234  167   7586
rhes564JDBC Thin Client 70
HIVEUSERoracle/15396 hduser/1234  208   15532
rhes564JDBC Thin Client 70
HIVEUSERoracle/15418 hduser/1234  246   20955
rhes564JDBC Thin Client 70
HIVEUSERoracle/15423 hduser/1234  286   19165
rhes564JDBC Thin Client 70
HIVEUSERoracle/15427 hduser/1234  325   44799
rhes564JDBC Thin Client 70
HIVEUSERoracle/15429 hduser/1234  369   20697
rhes564JDBC Thin Client 70
HIVEUSERoracle/15431 hduser/1234  408   39060
rhes564JDBC Thin Client 70
HIVEUSERoracle/15433 hduser/1234  446   10661
rhes564JDBC Thin Client 70
HIVEUSERoracle/15689 hduser/1234  7 6105
rhes564JDBC Thin Client 70
HIVEUSERoracle/15691 hduser/1234  4914162
rhes564JDBC Thin Client 70

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 19 April 2016 at 06:53, Sea <261810...@qq.com> wrote:

> It's a bug of hive. Please use hive metastore service instead of visiting
> mysql directly.
> set hive.metastore.uris in hive-site.xml
>
>
>
> -- 原始邮件 --
> *发件人:* "Jieliang Li";;
> *发送时间:* 2016年4月19日(星期二) 中午12:55
> *收件人:* "user";
> *主题:* spark sql on hive
>
> hi everyone.i use spark sql, but throw an exception:
> Retrying creating default database after error: Error creating
> transactional connection factory
> javax.jdo.JDOFatalInternalException: Error creating transactional
> connection factory
> at
> org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
> at
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
> at
> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
> at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
> at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
> at
> org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
> at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
> at
> org.apache.hadoop.hive.metastore.RawStoreProxy.(RawStoreProxy.java:56)
> at
> 

prefix column Spark

2016-04-19 Thread nihed mbarek
Hi,

I want to prefix a set of dataframes and I try two solutions:
* A for loop calling withColumnRename based on columns()
* transforming my Dataframe to and RDD, updating the old schema and
recreating the dataframe.


both are working for me, the second one is faster with tables that contain
800 columns but have a more stage of transformation toRDD.

Is there any other solution?

Thank you

-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.

As I mentionned before, I'm planning to use one kafka cluster and 2 or more
spark cluster distinct.

Let's say we have the following DCs configuration in a nominal case.
Kafka partitions are consumed uniformly by the 2 datacenters.

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1
Worker 1.2 my_group P2
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4
I would like, in case of DC crash, a rebalancing of partition on the healthy
DC, something as follow

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1*, P3*
Worker 1.2 my_group P2*, P4*
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4

I would like to know if it's possible:
- using consumer group ?
- using direct approach ? I prefer this one as I don't want to activate WAL.

Hope the explanation is better !


On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger  wrote:

> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
>
> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
>
> Regarding your multi-DC questions, I'm not really clear on what you're
> saying.
>
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group
> mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one
> spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the
> currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock"
> stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
>


Spark + HDFS

2016-04-19 Thread Chaturvedi Chola
When I use spark and hdfs on two different clusters.
How does spark workers know that which block of data is available in which
hdfs node.
Who basically caters to this.

Can someone throw light on this.


Re: Code optimization

2016-04-19 Thread Alonso Isidoro Roman
Hi Angel,

how about to use this :

k.filter(k("WT_ID")

as a val variable? i think you can avoid that and do not forget to use
System.nanoTime to know the profit...

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-04-19 9:46 GMT+02:00 Angel Angel :

> Hello,
>
> I am writing the one spark application, it runs well but takes long
> execution time can anyone help me to optimize my query to increase the
> processing speed.
>
>
> I am writing one application in which i have to construct the histogram
> and compare the histograms in order to find the final candidate.
>
>
> My code in which i read the text file and matches the first field and
> subtract the second fild from the matched candidates and update the table.
>
> Here is my code, Please help me to optimize it.
>
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
>
> import sqlContext.implicits._
>
>
> val Array_Ele =
> sc.textFile("/root/Desktop/database_200/patch_time_All_20_modified_1.txt").flatMap(line=>line.split("
> ")).take(900)
>
>
> val df1=
> sqlContext.read.parquet("hdfs://hadoopm0:8020/tmp/input1/database_modified_No_name_400.parquet")
>
>
> var k = df1.filter(df1("Address").equalTo(Array_Ele(0) ))
>
> var a= 0
>
>
> for( a <-2 until 900 by 2){
>
> k=k.unionAll(
> df1.filter(df1("Address").equalTo(Array_Ele(a))).select(df1("Address"),df1("Couple_time")-Array_Ele(a+1),df1("WT_ID")))}
>
>
> k.cache()
>
>
> val WT_ID_Sort  = k.groupBy("WT_ID").count().sort(desc("count"))
>
>
> val temp = WT_ID_Sort.select("WT_ID").rdd.map(r=>r(0)).take(10)
>
>
> val Table0=
> k.filter(k("WT_ID").equalTo(temp(0))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table1=
> k.filter(k("WT_ID").equalTo(temp(1))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table2=
> k.filter(k("WT_ID").equalTo(temp(2))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table3=
> k.filter(k("WT_ID").equalTo(temp(3))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table4=
> k.filter(k("WT_ID").equalTo(temp(4))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table5=
> k.filter(k("WT_ID").equalTo(temp(5))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table6=
> k.filter(k("WT_ID").equalTo(temp(6))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table7=
> k.filter(k("WT_ID").equalTo(temp(7))).groupBy("Couple_time").count().select(max($"count")).show()
>
> val Table8=
> k.filter(k("WT_ID").equalTo(temp(8))).groupBy("Couple_time").count().select(max($"count")).show()
>
>
>
> val Table10=
> k.filter(k("WT_ID").equalTo(temp(10))).groupBy("Couple_time").count().select(max($"count")).show()
>
>
> val Table11=
> k.filter(k("WT_ID").equalTo(temp(11))).groupBy("Couple_time").count().select(max($"count")).show()
>
>
> and last one how can i compare the all this tables to find the maximum
> value.
>
>
>
>
> Thanks,
>
>
>


Re:Re: Why Spark having OutOfMemory Exception?

2016-04-19 Thread 李明伟
Hi Zhan Zhang




Please see the exception trace below. It is saying some GC overhead limit error
I am not a java or scala developer so it is hard for me to understand these 
infor.
Also reading coredump is too difficult to me..


I am not sure if the way I am using spark is correct. I understand that spark 
can do batch or stream calculation. But my way is to setup a forever loop to 
handle continued income data. 
Not sure if it is the right way to use spark




16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at 
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC 
overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at 
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at 

How to know whether I'm in the first batch of spark streaming

2016-04-19 Thread Yu Xie
hi spark users

I'm running a spark streaming application, with concurrentJobs > 1, so
maybe more than one batches could run together.

Now I would like to do some init work in the first batch based on the
"time" of the first batch. So even the second batch runs faster than the
first batch, I still need to init in the literal "first batch"

Then is there a way that I can know that?
Thank you


Minimum granularity of batch interval in Spark streaming

2016-04-19 Thread Mich Talebzadeh
Hi,

What is the minimum granularity of micro-batch interval in Spark streaming
if there is such limit?

Is throughput within the window configurable. For example what are the
parameters affecting the volume of streaming data being processed?

In another words throughput per second.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Alex Kozlov
This is too big of a topic.  For starters, what is the latency between you
obtain the data and the data is available for analysis?  Obviously if this
is < 5 minutes, you probably need a streaming solution.  How fast the
"micro batches of seconds" need to be available for analysis?  Can the
problem be easily partitioned and how flexible are you in the # of machines
for your solution?  Are you OK with availability fat tails?

Another question, how big is an individual message in bytes?  XML/JSON are
extremely inefficient and with "10 mils of messages" you might hit other
bottlenecks like network unless you convert them into something more
machine-like like Protobuf, Avro or Thrift.

>From the top, look at Kafka, Flume, Storm.

To "serve the historical  data in milliseconds (may be upto 30 days of
data)" you'll need to cache data in memory.  The question, again, is how
often the data change.  You might look into Lambda architectures.

On Mon, Apr 18, 2016 at 10:21 PM, Prashant Sharma 
wrote:

> Hello Deepak,
>
> It is not clear what you want to do. Are you talking about spark streaming
> ? It is possible to process historical data in Spark batch mode too. You
> can add a timestamp field in xml/json. Spark documentation is at
> spark.apache.org. Spark has good inbuilt features to process json and
> xml[1] messages.
>
> Thanks,
> Prashant Sharma
>
> 1. https://github.com/databricks/spark-xml
>
> On Tue, Apr 19, 2016 at 10:31 AM, Deepak Sharma 
> wrote:
>
>> Hi all,
>> I am looking for an architecture to ingest 10 mils of messages in the
>> micro batches of seconds.
>> If anyone has worked on similar kind of architecture  , can you please
>> point me to any documentation around the same like what should be the
>> architecture , which all components/big data ecosystem tools should i
>> consider etc.
>> The messages has to be in xml/json format , a preprocessor engine or
>> message enhancer and then finally a processor.
>> I thought about using data cache as well for serving the data
>> The data cache should have the capability to serve the historical  data
>> in milliseconds (may be upto 30 days of data)
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>>
>>
--
Alex Kozlov
ale...@gmail.com


Code optimization

2016-04-19 Thread Angel Angel
Hello,

I am writing the one spark application, it runs well but takes long
execution time can anyone help me to optimize my query to increase the
processing speed.


I am writing one application in which i have to construct the histogram and
compare the histograms in order to find the final candidate.


My code in which i read the text file and matches the first field and
subtract the second fild from the matched candidates and update the table.

Here is my code, Please help me to optimize it.


val sqlContext = new org.apache.spark.sql.SQLContext(sc)


import sqlContext.implicits._


val Array_Ele =
sc.textFile("/root/Desktop/database_200/patch_time_All_20_modified_1.txt").flatMap(line=>line.split("
")).take(900)


val df1=
sqlContext.read.parquet("hdfs://hadoopm0:8020/tmp/input1/database_modified_No_name_400.parquet")


var k = df1.filter(df1("Address").equalTo(Array_Ele(0) ))

var a= 0


for( a <-2 until 900 by 2){

k=k.unionAll(
df1.filter(df1("Address").equalTo(Array_Ele(a))).select(df1("Address"),df1("Couple_time")-Array_Ele(a+1),df1("WT_ID")))}


k.cache()


val WT_ID_Sort  = k.groupBy("WT_ID").count().sort(desc("count"))


val temp = WT_ID_Sort.select("WT_ID").rdd.map(r=>r(0)).take(10)


val Table0=
k.filter(k("WT_ID").equalTo(temp(0))).groupBy("Couple_time").count().select(max($"count")).show()

val Table1=
k.filter(k("WT_ID").equalTo(temp(1))).groupBy("Couple_time").count().select(max($"count")).show()

val Table2=
k.filter(k("WT_ID").equalTo(temp(2))).groupBy("Couple_time").count().select(max($"count")).show()

val Table3=
k.filter(k("WT_ID").equalTo(temp(3))).groupBy("Couple_time").count().select(max($"count")).show()

val Table4=
k.filter(k("WT_ID").equalTo(temp(4))).groupBy("Couple_time").count().select(max($"count")).show()

val Table5=
k.filter(k("WT_ID").equalTo(temp(5))).groupBy("Couple_time").count().select(max($"count")).show()

val Table6=
k.filter(k("WT_ID").equalTo(temp(6))).groupBy("Couple_time").count().select(max($"count")).show()

val Table7=
k.filter(k("WT_ID").equalTo(temp(7))).groupBy("Couple_time").count().select(max($"count")).show()

val Table8=
k.filter(k("WT_ID").equalTo(temp(8))).groupBy("Couple_time").count().select(max($"count")).show()



val Table10=
k.filter(k("WT_ID").equalTo(temp(10))).groupBy("Couple_time").count().select(max($"count")).show()


val Table11=
k.filter(k("WT_ID").equalTo(temp(11))).groupBy("Couple_time").count().select(max($"count")).show()


and last one how can i compare the all this tables to find the maximum
value.




Thanks,


Re: [Spark 1.5.2] Log4j Configuration for executors

2016-04-19 Thread Prashant Sharma
May be you can try creating it before running the App.