Re: Why chinese character gash appear when i use spark textFile?

2017-04-05 Thread Yan Facai
Perhaps your file is not utf-8.

I cannot reconstruct it.

### HADOOP:
~/Downloads ❯❯❯ hdfs -cat hdfs:///test.txt
17/04/06 13:43:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
1.0 862910025238798 100733314   18_百度输入法:100733314
8919173c6d49abfab02853458247e5841:129:18_百度输入法:1.0%

### SPARK
scala> val t = sc.textFile("hdfs:///test.txt")
t: org.apache.spark.rdd.RDD[String] = hdfs:///test.txt
MapPartitionsRDD[3] at textFile at :24

scala> t.first
res2: String = 1.0 862910025238798 100733314   18_百度输入法:100733314






On Thu, Apr 6, 2017 at 12:47 PM, Jone Zhang  wrote:

> var textFile = sc.textFile("xxx");
> textFile.first();
> res1: String = 1.0 100733314   18_?:100733314
> 8919173c6d49abfab02853458247e5841:129:18_?:1.0
>
>
> hadoop fs -cat xxx
> 1.0100733314   18_百度输入法:100733314 8919173c6d49abfab02853458247e584
>  1:129:18_百度输入法:1.0
>
> Why  chinese character gash appear when i use spark textFile?
> The code of hdfs file is utf-8.
>
>
> Thanks
>


Re: Read file and represent rows as Vectors

2017-04-05 Thread Yan Facai
You can try `mapPartitions` method.

example as below:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions

On Mon, Apr 3, 2017 at 8:05 PM, Old-School 
wrote:

> I have a dataset that contains DocID, WordID and frequency (count) as shown
> below. Note that the first three numbers represent 1. the number of
> documents, 2. the number of words in the vocabulary and 3. the total number
> of words in the collection.
>
> 189
> 1430
> 12300
> 1 2 1
> 1 39 1
> 1 42 3
> 1 77 1
> 1 95 1
> 1 96 1
> 2 105 1
> 2 108 1
> 3 133 3
>
>
> What I want to do is to read the data (ignore the first three lines),
> combine the words per document and finally represent each document as a
> vector that contains the frequency of the wordID.
>
> Based on the above dataset the representation of documents 1, 2 and 3 will
> be (note that vocab_size can be extracted by the second line of the data):
>
> val data = Array(
> Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0),
> (95, 1.0), (96, 1.0))),
> Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))),
> Vectors.sparse(vocab_size, Seq((133, 3.0
>
>
> The problem is that I am not quite sure how to read the .txt.gz file as RDD
> and create an Array of sparse vectors as described above. Please note that
> I
> actually want to pass the data array in the PCA transformer.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How spark connects to Hive metastore?

2017-04-05 Thread infaelance
Hi all,
When using spark-shell my understanding is spark connects to hive through
metastore.
The question i have is does spark connect to metastore , is it JDBC? 

Any good links for documents on how spark connects to hive or other data
sources? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-connects-to-Hive-metastore-tp28574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark and Hive connection

2017-04-05 Thread infa elance
Hi all,
When using spark-shell my understanding is spark connects to hive through
metastore.
The question i have is does spark connect to metastore , is it JDBC?

Thanks and Regards,
Ajay.


Consuming AWS Cloudwatch logs from Kinesis into Spark

2017-04-05 Thread Tim Smith
I am sharing this code snippet since I spent quite some time figuring it
out and I couldn't find any examples online. Between the Kinesis
documentation, tutorial on AWS site and other code snippets on the
Internet, I was confused about structure/format of the messages that Spark
fetches from Kinesis - base64 encoded, json, gzipped - which one first and
what order.

I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it
helps others googling for similar info. I tried using Structured Streaming
but (1) it's in Alpha and (2) despite including what I thought were all the
dependencies, it complained of not finding DataSource.Kinesis. You probably
do not need all the libs but I am just too lazy to redact ones you don't
require for the snippet below :)

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import java.util.Base64
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
import org.apache.commons.math3.stat.descriptive._
import java.io.File
import java.net.InetAddress
import scala.util.control.NonFatal
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SaveMode
import java.util.Properties;
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.util.Try


//sc.setLogLevel("INFO")

val ssc = new StreamingContext(sc, Seconds(30))

val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc,
"myApp", "cloudwatchlogs",
"https://kinesis.us-east-1.amazonaws.com","us-east-1;,
InitialPositionInStream.LATEST , Seconds(30),
StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") }

val unionStreams = ssc.union(kinesisStreams)

unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
  if(rdd.count() > 0) {
  val json = rdd.map(input => {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(input))
  val record = scala.io.Source.fromInputStream(inputStream).mkString
  compact(render(parse(record)))
  })

  val df = spark.sqlContext.read.json(json)
  val preDF =
df.select($"logGroup",explode($"logEvents").as("events_flat"))
  val penDF = preDF.select($"logGroup",$"events_flat.extractedFields")
  val finalDF =
penDF.select($"logGroup".as("cluster"),$"extractedFields.*")
  finalDF.printSchema()
  finalDF.show()
 }
})

ssc.start



--
Thanks,

Tim


Why chinese character gash appear when i use spark textFile?

2017-04-05 Thread Jone Zhang
var textFile = sc.textFile("xxx");
textFile.first();
res1: String = 1.0 100733314   18_?:100733314
8919173c6d49abfab02853458247e5841:129:18_?:1.0


hadoop fs -cat xxx
1.0100733314   18_百度输入法:100733314 8919173c6d49abfab02853458247e584
 1:129:18_百度输入法:1.0

Why  chinese character gash appear when i use spark textFile?
The code of hdfs file is utf-8.


Thanks


Why chinese character gash appear when i use spark textFile?

2017-04-05 Thread JoneZhang
var textFile = sc.textFile("xxx"); 
textFile.first();
res1: String = 1.0  862910025238798 100733314   18_?:100733314 
8919173c6d49abfab02853458247e5841:129:18_?:1.0


hadoop fs -cat xxx 
1.0 862910025238798 100733314   18_百度输入法:100733314
8919173c6d49abfab02853458247e5841:129:18_百度输入法:1.0

Why  chinese character gash appear when i use spark textFile?
The code of hdfs file is utf-8.


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-chinese-character-gash-appear-when-i-use-spark-textFile-tp28573.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: JSON lib works differently in spark-shell and IDE like intellij

2017-04-05 Thread Mungeol Heo
It will work with spark-submit, if putting the configuration, which is
addressed below, under the maven-shade-plugin.


  

  net.minidev
  shaded.net.minidev

  


Still, need a way to make it work with spark-shell for testing purpose.
Any idea will be grate.

Thank you.

On Wed, Apr 5, 2017 at 6:52 PM, Mungeol Heo  wrote:
> Hello,
>
> I am using "minidev" which is a JSON lib to remove duplicated keys in
> JSON object.
>
> 
> minidev
> 
>
> 
>   net.minidev
>   json-smart
>   2.3
> 
>
> 
> Test Code
> 
>
> import net.minidev.json.parser.JSONParser
> val badJson = "{\"keyA\":\"valueA\",\"keyB\":\"valueB\",\"keyA\":\"valueA\"}"
> val json = new 
> JSONParser(JSONParser.MODE_PERMISSIVE).parse(badJson.toLowerCase())
> println(json)
>
> 
>
> The source code placed above works at IDE like intellij.
> But, it gives error at spark-shell
>
> 
> Error
> 
>
> net.minidev.json.parser.ParseException: Unexpected duplicate key:keya
> at position 33.
>
> 
>
> BTW, both IDE and spark-shell using same version of scala which is 2.11.8.
> And, of course, same version of "minidev"
>
> Any help will be great.
> Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



run-time exception trying to train MultilayerPerceptronClassifier with DataFrame

2017-04-05 Thread Pete Prokopowicz
Hello,

I am trying to train a neural net using a dataframe constructed from an RDD
of LabeledPoints. The data frame's schema is:

[label: double, features: vector]

The actual features values are SparseVectors.  The runtime error I get when
I call


val labeledPoints: RDD[LabeledPoint] = 

val fields: Seq[StructField] = List[StructField] (
  StructField("label", DoubleType, nullable = false),
  StructField("features", VectorType, nullable = false))

val schema : StructType = StructType(fields)

val labeledPointsAsRowRDD = labeledPoints.map(point =>
 Row(point.label, point.features))
val trainingDataFrame =
 spark.createDataFrame(labeledPointsAsRowRDD, schema)

trainer.fit(trainingDataFrame)

is:

org.apache.spark.mllib.linalg.SparseVector is not a valid external type for
schema of vector

I'm not able to figure out whether the DataFrame doesn't conform to the
schema, or the schema doesn't conform to what the ml lib expects, or what.
Any suggestions would be very helpful.

 Also, I'm confused about why the MultilayerPerceptronClassifier doesn't
work directly with an RDD[LabeledPoint] as DecisionTree, RandomForest, etc
do.

Caused by: java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: org.apache.spark.mllib.linalg.SparseVector is
not a valid external type for schema of vector
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, label),
DoubleType) AS label#0
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, label),
DoubleType)
   +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row,
true], top level row object), 0, label)
  +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
 +- input[0, org.apache.spark.sql.Row, true]

newInstance(class org.apache.spark.ml.linalg.VectorUDT).serialize AS
features#1
+- newInstance(class org.apache.spark.ml.linalg.VectorUDT).serialize
   :- newInstance(class org.apache.spark.ml.linalg.VectorUDT)
   +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, features),
org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)
  +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, features)
 +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
+- input[0, org.apache.spark.sql.Row, true]

-- 

*Pete Prokopowicz*Sr. Engineer - BEMOD!  Behavioral Modeling

600 W. Chicago Ave, Chicago, IL 60654
Mobile: 708-654-8137
Groupon

 II Grouponworks



Master-Worker communication on Standalone cluster issues

2017-04-05 Thread map reduced
Hi,

I was wondering on how often does Worker pings Master to check on Master's
liveness? Or is it the Master (Resource manager) that pings Workers to
check on their liveness and if any workers are dead to spawn ? Or is it
both?

Some info:
Standalone cluster
1 Master - 8core 12Gb
32 workers - each 8 core and 8 Gb

My main problem - Here's what happened:

Master M - running with 32 workers
Worker 1 and 2 died at 03:55:00 - so now the cluster is 30 workers

Worker 1' came up at 03:55:12.000 AM - it connected to M
Worker 2' came up at 03:55:16.000 AM - it connected to M

Master M *dies* at 03:56.00 AM
New master NM' comes up at 03:56:30 AM
Worker 1' and 2' - *DO NOT* connect to NM
Remaining 30 workers connect to NM.

So NM now has 30 workers.

I was wondering on why those two won't connect to new master NM even though
master M is dead for sure.

PS:I have a LB setup for Master which means that whenever a new master
comes in LB will start pointing to new one.

Thanks,
KP


Re: Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Nicholas Chammas
Ah, that's why all the stuff about scheduler pools is under the
section "Scheduling
Within an Application
".
 I am so used to talking to my coworkers about jobs in sense of
applications that I forgot your typical Spark application submits multiple
"jobs", each of which has multiple stages, etc.

So in my case I need to read up more closely about YARN queues

since I want to share resources *across* applications. Thanks Mark!

On Wed, Apr 5, 2017 at 4:31 PM Mark Hamstra  wrote:

> `spark-submit` creates a new Application that will need to get resources
> from YARN. Spark's scheduler pools will determine how those resources are
> allocated among whatever Jobs run within the new Application.
>
> Spark's scheduler pools are only relevant when you are submitting multiple
> Jobs within a single Application (i.e., you are using the same SparkContext
> to launch multiple Jobs) and you have used SparkContext#setLocalProperty to
> set "spark.scheduler.pool" to something other than the default pool before
> a particular Job intended to use that pool is started via that SparkContext.
>
> On Wed, Apr 5, 2017 at 1:11 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
> Hmm, so when I submit an application with `spark-submit`, I need to
> guarantee it resources using YARN queues and not Spark's scheduler pools.
> Is that correct?
>
> When are Spark's scheduler pools relevant/useful in this context?
>
> On Wed, Apr 5, 2017 at 3:54 PM Mark Hamstra 
> wrote:
>
> grrr... s/your/you're/
>
> On Wed, Apr 5, 2017 at 12:54 PM, Mark Hamstra 
> wrote:
>
> Your mixing up different levels of scheduling. Spark's fair scheduler
> pools are about scheduling Jobs, not Applications; whereas YARN queues with
> Spark are about scheduling Applications, not Jobs.
>
> On Wed, Apr 5, 2017 at 12:27 PM, Nick Chammas 
> wrote:
>
> I'm having trouble understanding the difference between Spark fair
> scheduler pools
> 
> and YARN queues
> .
> Do they conflict? Does one override the other?
>
> I posted a more detailed question about an issue I'm having with this on
> Stack Overflow: http://stackoverflow.com/q/43239921/877069
>
> Nick
>
>
> --
> View this message in context: Spark fair scheduler pools vs. YARN queues
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>
>
>
>
>


Re: Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Mark Hamstra
`spark-submit` creates a new Application that will need to get resources
from YARN. Spark's scheduler pools will determine how those resources are
allocated among whatever Jobs run within the new Application.

Spark's scheduler pools are only relevant when you are submitting multiple
Jobs within a single Application (i.e., you are using the same SparkContext
to launch multiple Jobs) and you have used SparkContext#setLocalProperty to
set "spark.scheduler.pool" to something other than the default pool before
a particular Job intended to use that pool is started via that SparkContext.

On Wed, Apr 5, 2017 at 1:11 PM, Nicholas Chammas  wrote:

> Hmm, so when I submit an application with `spark-submit`, I need to
> guarantee it resources using YARN queues and not Spark's scheduler pools.
> Is that correct?
>
> When are Spark's scheduler pools relevant/useful in this context?
>
> On Wed, Apr 5, 2017 at 3:54 PM Mark Hamstra 
> wrote:
>
>> grrr... s/your/you're/
>>
>> On Wed, Apr 5, 2017 at 12:54 PM, Mark Hamstra 
>> wrote:
>>
>> Your mixing up different levels of scheduling. Spark's fair scheduler
>> pools are about scheduling Jobs, not Applications; whereas YARN queues with
>> Spark are about scheduling Applications, not Jobs.
>>
>> On Wed, Apr 5, 2017 at 12:27 PM, Nick Chammas > > wrote:
>>
>> I'm having trouble understanding the difference between Spark fair
>> scheduler pools
>> 
>> and YARN queues
>> .
>> Do they conflict? Does one override the other?
>>
>> I posted a more detailed question about an issue I'm having with this on
>> Stack Overflow: http://stackoverflow.com/q/43239921/877069
>>
>> Nick
>>
>>
>> --
>> View this message in context: Spark fair scheduler pools vs. YARN queues
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>>
>>
>>


Re: Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Nicholas Chammas
Hmm, so when I submit an application with `spark-submit`, I need to
guarantee it resources using YARN queues and not Spark's scheduler pools.
Is that correct?

When are Spark's scheduler pools relevant/useful in this context?

On Wed, Apr 5, 2017 at 3:54 PM Mark Hamstra  wrote:

> grrr... s/your/you're/
>
> On Wed, Apr 5, 2017 at 12:54 PM, Mark Hamstra 
> wrote:
>
> Your mixing up different levels of scheduling. Spark's fair scheduler
> pools are about scheduling Jobs, not Applications; whereas YARN queues with
> Spark are about scheduling Applications, not Jobs.
>
> On Wed, Apr 5, 2017 at 12:27 PM, Nick Chammas 
> wrote:
>
> I'm having trouble understanding the difference between Spark fair
> scheduler pools
> 
> and YARN queues
> .
> Do they conflict? Does one override the other?
>
> I posted a more detailed question about an issue I'm having with this on
> Stack Overflow: http://stackoverflow.com/q/43239921/877069
>
> Nick
>
>
> --
> View this message in context: Spark fair scheduler pools vs. YARN queues
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>
>
>
>


Re: convert JavaRDD<List> to JavaRDD

2017-04-05 Thread Vinay Parekar
Sorry, I guess my mac autocorrected it . Yeah its flatmap().

From: Jiang Jacky >
Date: Wednesday, April 5, 2017 at 12:10 PM
To: ostkadm >
Cc: Hamza HACHANI >, 
"user@spark.apache.org" 
>
Subject: Re: convert JavaRDD to JavaRDD

There is no flattop just flatMap

On Apr 5, 2017, at 12:24 PM, Vinay Parekar 
> wrote:

I think flattop() will be helpful in this case . Correct me if I am wrong.

From: Hamza HACHANI >
Date: Wednesday, April 5, 2017 at 3:43 AM
To: "user@spark.apache.org" 
>
Subject: convert JavaRDD to JavaRDD


I want to convert a JavaRDD to JavaRDD. For example if 
there is 3 elment in List 3 Object would be created in my new 
JavaRDD.

Does any one have an idea ?



CONFIDENTIALITY NOTICE: This message is intended only for the use and review of 
the individual or entity to which it is addressed and may contain information 
that is privileged and confidential. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message solely to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this communication in error, please notify 
sender immediately by telephone or return email. Thank you.



CONFIDENTIALITY NOTICE: This message is intended only for the use and review of 
the individual or entity to which it is addressed and may contain information 
that is privileged and confidential. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message solely to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this communication in error, please notify 
sender immediately by telephone or return email. Thank you.


Re: Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Mark Hamstra
grrr... s/your/you're/

On Wed, Apr 5, 2017 at 12:54 PM, Mark Hamstra 
wrote:

> Your mixing up different levels of scheduling. Spark's fair scheduler
> pools are about scheduling Jobs, not Applications; whereas YARN queues with
> Spark are about scheduling Applications, not Jobs.
>
> On Wed, Apr 5, 2017 at 12:27 PM, Nick Chammas 
> wrote:
>
>> I'm having trouble understanding the difference between Spark fair
>> scheduler pools
>> 
>> and YARN queues
>> .
>> Do they conflict? Does one override the other?
>>
>> I posted a more detailed question about an issue I'm having with this on
>> Stack Overflow: http://stackoverflow.com/q/43239921/877069
>>
>> Nick
>>
>>
>> --
>> View this message in context: Spark fair scheduler pools vs. YARN queues
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Mark Hamstra
Your mixing up different levels of scheduling. Spark's fair scheduler pools
are about scheduling Jobs, not Applications; whereas YARN queues with Spark
are about scheduling Applications, not Jobs.

On Wed, Apr 5, 2017 at 12:27 PM, Nick Chammas 
wrote:

> I'm having trouble understanding the difference between Spark fair
> scheduler pools
> 
> and YARN queues
> .
> Do they conflict? Does one override the other?
>
> I posted a more detailed question about an issue I'm having with this on
> Stack Overflow: http://stackoverflow.com/q/43239921/877069
>
> Nick
>
>
> --
> View this message in context: Spark fair scheduler pools vs. YARN queues
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Spark fair scheduler pools vs. YARN queues

2017-04-05 Thread Nick Chammas
I'm having trouble understanding the difference between Spark fair
scheduler pools

and YARN queues
.
Do they conflict? Does one override the other?

I posted a more detailed question about an issue I'm having with this on
Stack Overflow: http://stackoverflow.com/q/43239921/877069

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-fair-scheduler-pools-vs-YARN-queues-tp28572.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: How to use ManualClock with Spark streaming

2017-04-05 Thread Mendelson, Assaf
You can try taking a look at this: 
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/

Thanks,
  Assaf.

From: Hemalatha A [mailto:hemalatha.amru...@googlemail.com]
Sent: Wednesday, April 05, 2017 1:59 PM
To: Saisai Shao; user@spark.apache.org
Subject: Re: How to use ManualClock with Spark streaming

Any updates on how can I use ManualClock other than editing the Spark source 
code?

On Wed, Mar 1, 2017 at 10:19 AM, Hemalatha A 
> 
wrote:
It is certainly possible through a hack.
I was referring to below post where TD says it is possible thru a hack. I 
wanted to know if there is  any way other than editing the Spark source code.

https://groups.google.com/forum/#!searchin/spark-users/manualclock%7Csort:relevance/spark-users/ES8X1l_xn5s/6PvGGRDfgnMJ

On Wed, Mar 1, 2017 at 7:09 AM, Saisai Shao 
> wrote:
I don't think using ManualClock is a right way to fix your problem here in 
Spark Streaming.

ManualClock in Spark is mainly used for unit test, it should manually advance 
the time to make the unit test work. The usage looks different compared to the 
scenario you mentioned.

Thanks
Jerry

On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A 
> 
wrote:

Hi,

I am running streaming application reading data from kafka and performing 
window operations on it. I have a usecase where  all incoming events have a 
fixed latency of 10s, which means data belonging to minute 10:00:00 will arrive 
10s late at 10:00:10.

I want to set the spark clock to "Manualclock" and set the time behind by 10s 
so that the batch calculation triggers at 10:00:10, during which time all the 
events for the previous minute has arrived.

But, I see that "spark.streaming.clock" is hardcoded to 
"org.apache.spark.util.SystemClock" in the code.

Is there a way to easily  hack this property to use Manual clock.
--


Regards
Hemalatha




--


Regards
Hemalatha



--


Regards
Hemalatha


Re: convert JavaRDD<List> to JavaRDD

2017-04-05 Thread Jiang Jacky
There is no flattop just flatMap 

> On Apr 5, 2017, at 12:24 PM, Vinay Parekar  wrote:
> 
> I think flattop() will be helpful in this case . Correct me if I am wrong.
> 
> From: Hamza HACHANI 
> Date: Wednesday, April 5, 2017 at 3:43 AM
> To: "user@spark.apache.org" 
> Subject: convert JavaRDD to JavaRDD
> 
> I want to convert a JavaRDD to JavaRDD. For example if 
> there is 3 elment in List 3 Object would be created in my new 
> JavaRDD.
> 
> Does any one have an idea ?
> 
> 
> 
> CONFIDENTIALITY NOTICE: This message is intended only for the use and review 
> of the individual or entity to which it is addressed and may contain 
> information that is privileged and confidential. If the reader of this 
> message is not the intended recipient, or the employee or agent responsible 
> for delivering the message solely to the intended recipient, you are hereby 
> notified that any dissemination, distribution or copying of this 
> communication is strictly prohibited. If you have received this communication 
> in error, please notify sender immediately by telephone or return email. 
> Thank you.


Spark Streaming Kafka Job has strange behavior for certain tasks

2017-04-05 Thread Justin Miller
Greetings!

I've been running various spark streaming jobs to persist data from kafka 
topics and one persister in particular seems to have issues. I've verified that 
the number of messages is the same per partition (roughly of course) and the 
volume of data is a fraction of the volume of other persisters that appear to 
be working fine. 

The tasks appear to go fine until approximately 74-80 of the tasks (of 96) in, 
and then the remaining tasks take a while. I'm using EMR/Spark 2.1.0/Kafka 
0.10.0.1/EMRFS (EMR's S3 solution). Any help would be greatly appreciated!

Here's the code I'm using to do the transformation:

val transformedData = transformer(sqlContext.createDataFrame(values, 
converter.schema))

transformedData
  .write
  .mode(Append)
  .partitionBy(persisterConfig.partitioning: _*)
  .format("parquet")
  .save(parquetPath)

Here's the output of the job as it's running (thrift -> parquet/snappy -> s3 is 
the flow), the files are roughly the same size (96 files per 10 minute window):

17/04/05 16:43:43 INFO TaskSetManager: Finished task 72.0 in stage 7.0 (TID 
722) in 10089 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 57) 
(1/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 58.0 in stage 7.0 (TID 
680) in 10099 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 90) 
(2/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 81.0 in stage 7.0 (TID 
687) in 10244 ms on ip-172-20-218-144.us-west-2.compute.internal (executor 8) 
(3/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 23.0 in stage 7.0 (TID 
736) in 10236 ms on ip-172-20-209-248.us-west-2.compute.internal (executor 82) 
(4/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 52.0 in stage 7.0 (TID 
730) in 10275 ms on ip-172-20-218-144.us-west-2.compute.internal (executor 78) 
(5/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 45.0 in stage 7.0 (TID 
691) in 10289 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 41) 
(6/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 13.0 in stage 7.0 (TID 
712) in 10532 ms on ip-172-20-223-100.us-west-2.compute.internal (executor 65) 
(7/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 42.0 in stage 7.0 (TID 
694) in 10595 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 18) 
(8/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 763) 
in 10623 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 74) (9/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 82.0 in stage 7.0 (TID 
727) in 10631 ms on ip-172-20-212-76.us-west-2.compute.internal (executor 72) 
(10/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 69.0 in stage 7.0 (TID 
729) in 10716 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 55) 
(11/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 65.0 in stage 7.0 (TID 
673) in 10733 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 67) 
(12/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 15.0 in stage 7.0 (TID 
684) in 10737 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 85) 
(13/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 27.0 in stage 7.0 (TID 
748) in 10747 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 10) 
(14/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 46.0 in stage 7.0 (TID 
699) in 10834 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 48) 
(15/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 6.0 in stage 7.0 (TID 719) 
in 10838 ms on ip-172-20-211-125.us-west-2.compute.internal (executor 52) 
(16/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 11.0 in stage 7.0 (TID 
739) in 10892 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 83) 
(17/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 88.0 in stage 7.0 (TID 
697) in 10900 ms on ip-172-20-212-43.us-west-2.compute.internal (executor 70) 
(18/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 35.0 in stage 7.0 (TID 
678) in 10909 ms on ip-172-20-212-63.us-west-2.compute.internal (executor 77) 
(19/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 700) 
in 10906 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 46) 
(20/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 36.0 in stage 7.0 (TID 
732) in 10935 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 69) 
(21/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 19.0 in stage 7.0 (TID 
759) in 10948 ms on ip-172-20-223-100.us-west-2.compute.internal (executor 37) 
(22/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 41.0 in stage 7.0 (TID 
703) in 11013 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 81) 
(23/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 8.0 in stage 7.0 (TID 745) 
in 11007 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 13) 
(24/96)
17/04/05 16:43:44 INFO TaskSetManager: 

Re: how do i force unit test to do whole stage codegen

2017-04-05 Thread Jacek Laskowski
Thanks Koert for the kind words. That part however is easy to fix and
was surprised to have seen the old style referenced (!)

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Apr 5, 2017 at 6:14 PM, Koert Kuipers  wrote:
> its pretty much impossible to be fully up to date with spark given how fast
> it moves!
>
> the book is a very helpful reference
>
> On Wed, Apr 5, 2017 at 11:15 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> I'm very sorry for not being up to date with the current style (and
>> "promoting" the old style) and am going to review that part soon. I'm very
>> close to touch it again since I'm with Optimizer these days.
>>
>> Jacek
>>
>> On 5 Apr 2017 6:08 a.m., "Kazuaki Ishizaki"  wrote:
>>>
>>> Hi,
>>> The page in the URL explains the old style of physical plan output.
>>> The current style adds "*" as a prefix of each operation that the
>>> whole-stage codegen can be apply to.
>>>
>>> So, in your test case, whole-stage codegen has been already enabled!!
>>>
>>> FYI. I think that it is a good topic for d...@spark.apache.org.
>>>
>>> Kazuaki Ishizaki
>>>
>>>
>>>
>>> From:Koert Kuipers 
>>> To:"user@spark.apache.org" 
>>> Date:2017/04/05 05:12
>>> Subject:how do i force unit test to do whole stage codegen
>>> 
>>>
>>>
>>>
>>> i wrote my own expression with eval and doGenCode, but doGenCode never
>>> gets called in tests.
>>>
>>> also as a test i ran this in a unit test:
>>> spark.range(10).select('id as 'asId).where('id === 4).explain
>>> according to
>>>
>>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html
>>> this is supposed to show:
>>> == Physical Plan ==
>>> WholeStageCodegen
>>> :  +- Project [id#0L AS asId#3L]
>>> : +- Filter (id#0L = 4)
>>> :+- Range 0, 1, 8, 10, [id#0L]
>>>
>>> but it doesn't. instead it shows:
>>>
>>> == Physical Plan ==
>>> *Project [id#12L AS asId#15L]
>>> +- *Filter (id#12L = 4)
>>>   +- *Range (0, 10, step=1, splits=Some(4))
>>>
>>> so i am again missing the WholeStageCodegen. any idea why?
>>>
>>> i create spark session for unit tests simply as:
>>> val session = SparkSession.builder
>>>  .master("local[*]")
>>>  .appName("test")
>>>  .config("spark.sql.shuffle.partitions", 4)
>>>  .getOrCreate()
>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: bug with PYTHONHASHSEED

2017-04-05 Thread Paul Tremblay
I saw the bug fix. I am using the latest Spark available on AWS EMR which I
think is 2.01. I am at work and can't check my home config. I don't think
AWS merged in this fix.

Henry

On Tue, Apr 4, 2017 at 4:42 PM, Jeff Zhang  wrote:

>
> It is fixed in https://issues.apache.org/jira/browse/SPARK-13330
>
>
>
> Holden Karau 于2017年4月5日周三 上午12:03写道:
>
>> Which version of Spark is this (or is it a dev build)? We've recently
>> made some improvements with PYTHONHASHSEED propagation.
>>
>> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern > cal.com> wrote:
>>
>> 2017-04-01 21:54 GMT+02:00 Paul Tremblay :
>>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-
>> exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>>
>> Independent of the python version, you have to ensure that Python on
>> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
>> adding it to the environment of the spark processes.
>>
>> Best
>>
>> Eike
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: convert JavaRDD<List> to JavaRDD

2017-04-05 Thread Vinay Parekar
I think flattop() will be helpful in this case . Correct me if I am wrong.

From: Hamza HACHANI >
Date: Wednesday, April 5, 2017 at 3:43 AM
To: "user@spark.apache.org" 
>
Subject: convert JavaRDD to JavaRDD


I want to convert a JavaRDD to JavaRDD. For example if 
there is 3 elment in List 3 Object would be created in my new 
JavaRDD.

Does any one have an idea ?



CONFIDENTIALITY NOTICE: This message is intended only for the use and review of 
the individual or entity to which it is addressed and may contain information 
that is privileged and confidential. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message solely to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this communication in error, please notify 
sender immediately by telephone or return email. Thank you.


Re: how do i force unit test to do whole stage codegen

2017-04-05 Thread Koert Kuipers
its pretty much impossible to be fully up to date with spark given how fast
it moves!

the book is a very helpful reference

On Wed, Apr 5, 2017 at 11:15 AM, Jacek Laskowski  wrote:

> Hi,
>
> I'm very sorry for not being up to date with the current style (and
> "promoting" the old style) and am going to review that part soon. I'm very
> close to touch it again since I'm with Optimizer these days.
>
> Jacek
>
> On 5 Apr 2017 6:08 a.m., "Kazuaki Ishizaki"  wrote:
>
>> Hi,
>> The page in the URL explains the old style of physical plan output.
>> The current style adds "*" as a prefix of each operation that the
>> whole-stage codegen can be apply to.
>>
>> So, in your test case, whole-stage codegen has been already enabled!!
>>
>> FYI. I think that it is a good topic for d...@spark.apache.org.
>>
>> Kazuaki Ishizaki
>>
>>
>>
>> From:Koert Kuipers 
>> To:"user@spark.apache.org" 
>> Date:2017/04/05 05:12
>> Subject:how do i force unit test to do whole stage codegen
>> --
>>
>>
>>
>> i wrote my own expression with eval and doGenCode, but doGenCode never
>> gets called in tests.
>>
>> also as a test i ran this in a unit test:
>> spark.range(10).select('id as 'asId).where('id === 4).explain
>> according to
>>
>> *https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html*
>> 
>> this is supposed to show:
>> == Physical Plan ==
>> WholeStageCodegen
>> :  +- Project [id#0L AS asId#3L]
>> : +- Filter (id#0L = 4)
>> :+- Range 0, 1, 8, 10, [id#0L]
>>
>> but it doesn't. instead it shows:
>>
>> == Physical Plan ==
>> *Project [id#12L AS asId#15L]
>> +- *Filter (id#12L = 4)
>>   +- *Range (0, 10, step=1, splits=Some(4))
>>
>> so i am again missing the WholeStageCodegen. any idea why?
>>
>> i create spark session for unit tests simply as:
>> val session = SparkSession.builder
>>  .master("local[*]")
>>  .appName("test")
>>  .config("spark.sql.shuffle.partitions", 4)
>>  .getOrCreate()
>>
>>
>>


Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-05 Thread kant kodali
Actually I want to reset my counters every 24 hours then shouldn't the
window and slide interval = 24 hours. If so, how do I send updates to real
time dashboard every second? isn't the trigger interval is the same as
slide interval ?

On Wed, Apr 5, 2017 at 7:17 AM, kant kodali  wrote:

> One of our requirement is that we need to maintain counter for a 24 hour
> period such as number of transactions processed in the past 24 hours. After
> each day these counters can start from zero again so we just need to
> maintain a running count during the 24 hour period. Also since we want to
> show these stats on a real time dashboard we want those charts to be
> updated every second so I guess this would translate to window interval of
> 24 hours and a slide/trigger interval of 1 second. First of all, Is this
> okay ?
>
> Secondly, we push about 5000 JSON messages/sec into spark streaming and
> each message is about 2KB. we just need to parse those messages and compute
> say sum of certain fields  from each message and the result needs to be
> stored somewhere such that each run will take its result and add it up to
>  the previous run and this state have to be maintained for 24 hours and
> then we can reset it back to zero. so any advice on how to best approach
> this scenario?
>
> Thanks much!
>
> On Wed, Apr 5, 2017 at 12:39 AM, kant kodali  wrote:
>
>> Hi!
>>
>> I am talking about "stateful operations like aggregations". Does this
>> happen on heap or off heap by default? I came across a article where I saw
>> both on and off heap are possible but I am not sure what happens by default
>> and when Spark or Spark Structured Streaming decides to store off heap?
>>
>> I don't even know what mapGroupsWithState does since It's not part of
>> spark 2.1 which is what we currently use. Any pointers would be great.
>>
>> Thanks!
>>
>>
>>
>>
>>
>> On Tue, Apr 4, 2017 at 5:34 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Are you referring to the memory usage of stateful operations like
>>> aggregations, or the new mapGroupsWithState?
>>> The current implementation of the internal state store (that maintains
>>> the stateful aggregates) is such that it keeps all the data in memory of
>>> the executor. It does use HDFS-compatible file system for checkpointing,
>>> but as of now, it currently keeps all the data in memory of the executor.
>>> This is something we will improve in the future.
>>>
>>> That said, you can enabled watermarking in your query that would
>>> automatically clear old, unnecessary state thus limiting the total memory
>>> used for stateful operations.
>>> Furthermore, you can also monitor the size of the state and get alerts
>>> if the state is growing too large.
>>>
>>> Read more in the programming guide.
>>> Watermarking - http://spark.apache.org/docs
>>> /latest/structured-streaming-programming-guide.html#handling
>>> -late-data-and-watermarking
>>> Monitoring - http://spark.apache.org/docs/latest/structured-streaming-p
>>> rogramming-guide.html#monitoring-streaming-queries
>>>
>>> In case you were referring to something else, please give us more
>>> context details - what query, what are the symptoms you are observing.
>>>
>>> On Tue, Apr 4, 2017 at 5:17 PM, kant kodali  wrote:
>>>
 Why do we ever run out of memory in Spark Structured Streaming
 especially when Memory can always spill to disk ? until the disk is full we
 shouldn't be out of memory.isn't it? sure thrashing will happen more
 frequently and degrades performance but we do we ever run out Memory even
 in case of maintaining a state for 6 months or a year?

 Thanks!

>>>
>>>
>>
>


Re: how do i force unit test to do whole stage codegen

2017-04-05 Thread Jacek Laskowski
Hi,

I'm very sorry for not being up to date with the current style (and
"promoting" the old style) and am going to review that part soon. I'm very
close to touch it again since I'm with Optimizer these days.

Jacek

On 5 Apr 2017 6:08 a.m., "Kazuaki Ishizaki"  wrote:

> Hi,
> The page in the URL explains the old style of physical plan output.
> The current style adds "*" as a prefix of each operation that the
> whole-stage codegen can be apply to.
>
> So, in your test case, whole-stage codegen has been already enabled!!
>
> FYI. I think that it is a good topic for d...@spark.apache.org.
>
> Kazuaki Ishizaki
>
>
>
> From:Koert Kuipers 
> To:"user@spark.apache.org" 
> Date:2017/04/05 05:12
> Subject:how do i force unit test to do whole stage codegen
> --
>
>
>
> i wrote my own expression with eval and doGenCode, but doGenCode never
> gets called in tests.
>
> also as a test i ran this in a unit test:
> spark.range(10).select('id as 'asId).where('id === 4).explain
> according to
>
> *https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html*
> 
> this is supposed to show:
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#0L AS asId#3L]
> : +- Filter (id#0L = 4)
> :+- Range 0, 1, 8, 10, [id#0L]
>
> but it doesn't. instead it shows:
>
> == Physical Plan ==
> *Project [id#12L AS asId#15L]
> +- *Filter (id#12L = 4)
>   +- *Range (0, 10, step=1, splits=Some(4))
>
> so i am again missing the WholeStageCodegen. any idea why?
>
> i create spark session for unit tests simply as:
> val session = SparkSession.builder
>  .master("local[*]")
>  .appName("test")
>  .config("spark.sql.shuffle.partitions", 4)
>  .getOrCreate()
>
>
>


Re: With Twitter4j API, why am I not able to pull tweets with certain keywords?

2017-04-05 Thread Ian.Maloney
I think the twitter4j API only pulls some publicly available data. To get
the full dataset you might need to use a vendor like Radian6 or GnipŠ

See below:
https://brightplanet.com/2013/06/twitter-firehose-vs-twitter-api-whats-the-
difference-and-why-should-you-care/










On 4/5/17, 12:02 AM, "Gaurav1809"  wrote:

>I am using Spark Streaming to with twitter4j API to pull tweets.
>
>I am able to pull tweets for some keywords but not for others. If I
>explicitly tweet with those keywords, even then API does not pull them.
>For
>some it is smooth. Has anyone encountered this issue before? Please
>suggest
>solution. Thanks.
>
>
>
>--
>View this message in context:
>https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-
>2Dlist.1001560.n3.nabble.com_With-2DTwitter4j-2DAPI-2Dwhy-2Dam-2DI-2Dnot-2
>Dable-2Dto-2Dpull-2Dtweets-2Dwith-2Dcertain-2Dkeywords-2Dtp28567.html=Dw
>ICAg=nulvIAQnC0yOOjC0e0NVa8TOcyq9jNhjZ156R-JJU10=CxpqDYMuQy-1uNI-UOyUb
>aX6BMPCZXH8d8evuCoP_OA=zlAQzIz3G2wl_qkcOuR4x8NUcK7NdtBqOmhDGUo2oh8=8-7
>kJMLut_isR3eziillolR8QNyhFmUb3hWjfy3btlY=
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-05 Thread kant kodali
One of our requirement is that we need to maintain counter for a 24 hour
period such as number of transactions processed in the past 24 hours. After
each day these counters can start from zero again so we just need to
maintain a running count during the 24 hour period. Also since we want to
show these stats on a real time dashboard we want those charts to be
updated every second so I guess this would translate to window interval of
24 hours and a slide/trigger interval of 1 second. First of all, Is this
okay ?

Secondly, we push about 5000 JSON messages/sec into spark streaming and
each message is about 2KB. we just need to parse those messages and compute
say sum of certain fields  from each message and the result needs to be
stored somewhere such that each run will take its result and add it up to
 the previous run and this state have to be maintained for 24 hours and
then we can reset it back to zero. so any advice on how to best approach
this scenario?

Thanks much!

On Wed, Apr 5, 2017 at 12:39 AM, kant kodali  wrote:

> Hi!
>
> I am talking about "stateful operations like aggregations". Does this
> happen on heap or off heap by default? I came across a article where I saw
> both on and off heap are possible but I am not sure what happens by default
> and when Spark or Spark Structured Streaming decides to store off heap?
>
> I don't even know what mapGroupsWithState does since It's not part of
> spark 2.1 which is what we currently use. Any pointers would be great.
>
> Thanks!
>
>
>
>
>
> On Tue, Apr 4, 2017 at 5:34 PM, Tathagata Das  > wrote:
>
>> Are you referring to the memory usage of stateful operations like
>> aggregations, or the new mapGroupsWithState?
>> The current implementation of the internal state store (that maintains
>> the stateful aggregates) is such that it keeps all the data in memory of
>> the executor. It does use HDFS-compatible file system for checkpointing,
>> but as of now, it currently keeps all the data in memory of the executor.
>> This is something we will improve in the future.
>>
>> That said, you can enabled watermarking in your query that would
>> automatically clear old, unnecessary state thus limiting the total memory
>> used for stateful operations.
>> Furthermore, you can also monitor the size of the state and get alerts if
>> the state is growing too large.
>>
>> Read more in the programming guide.
>> Watermarking - http://spark.apache.org/docs/latest/structured-streaming-
>> programming-guide.html#handling-late-data-and-watermarking
>> Monitoring - http://spark.apache.org/docs/latest/structured-streaming-
>> programming-guide.html#monitoring-streaming-queries
>>
>> In case you were referring to something else, please give us more context
>> details - what query, what are the symptoms you are observing.
>>
>> On Tue, Apr 4, 2017 at 5:17 PM, kant kodali  wrote:
>>
>>> Why do we ever run out of memory in Spark Structured Streaming
>>> especially when Memory can always spill to disk ? until the disk is full we
>>> shouldn't be out of memory.isn't it? sure thrashing will happen more
>>> frequently and degrades performance but we do we ever run out Memory even
>>> in case of maintaining a state for 6 months or a year?
>>>
>>> Thanks!
>>>
>>
>>
>


Re: unit testing in spark

2017-04-05 Thread Shiva Ramagopal
Hi,

I've been following this thread for a while.

I'm trying to bring in a test strategy in my team to test a number of data
pipelines before production. I have watched Lars' presentation and find it
great. However I'm debating whether unit tests are worth the effort if
there are good job-level and pipeline-level tests. Does anybody have any
experiences benefitting from unit-tests in such a case?

Cheers,
Shiv

On Mon, Dec 12, 2016 at 6:00 AM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi all,
>
> I would also would like to participate on that.
>
> Greetings,
>
> Juan
>
> On Fri, Dec 9, 2016 at 6:03 AM, Michael Stratton  komodohealth.com> wrote:
>
>> That sounds great, please include me so I can get involved.
>>
>> On Fri, Dec 9, 2016 at 7:39 AM, Marco Mistroni 
>> wrote:
>>
>>> Me too as I spent most of my time writing unit/integ tests  pls
>>> advise on where I  can start
>>> Kr
>>>
>>> On 9 Dec 2016 12:15 am, "Miguel Morales" 
>>> wrote:
>>>
 I would be interested in contributing.  Ive created my own library for
 this as well.  In my blog post I talk about testing with Spark in RSpec
 style:
 https://medium.com/@therevoltingx/test-driven-development-w-
 apache-spark-746082b44941

 Sent from my iPhone

 On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:

 There are also libraries designed to simplify testing Spark in the
 various platforms, spark-testing-base
  for Scala/Java/Python
 (& video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
  (scala focused property based),
 pyspark.test (python focused with py.test instead of unittest2) (&
 blog post from nextdoor https://engblog.nextd
 oor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9
  )

 Good luck on your Spark Adventures :)

 P.S.

 If anyone is interested in helping improve spark testing libraries I'm
 always looking for more people to be involved with spark-testing-base
 because I'm lazy :p

 On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson 
 wrote:

> I wrote some advice in a previous post on the list:
> http://markmail.org/message/bbs5acrnksjxsrrs
>
> It does not mention python, but the strategy advice is the same. Just
> replace JUnit/Scalatest with pytest, unittest, or your favourite
> python test framework.
>
>
> I recently held a presentation on the subject. There is a video
> recording at https://vimeo.com/192429554 and slides at
> http://www.slideshare.net/lallea/test-strategies-for-data-pr
> ocessing-pipelines-67244458
>
> You can find more material on test strategies at
> http://www.mapflat.com/lands/resources/reading-list/index.html
>
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> https://twitter.com/lalleal
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>
>
> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
> wrote:
> > somone can tell me how i can make unit test on pyspark ?
> > (book, tutorial ...)
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


 --
 Cell : 425-233-8271 <(425)%20233-8271>
 Twitter: https://twitter.com/holdenkarau


>>
>


Re: How to use ManualClock with Spark streaming

2017-04-05 Thread Hemalatha A
Any updates on how can I use ManualClock other than editing the Spark
source code?

On Wed, Mar 1, 2017 at 10:19 AM, Hemalatha A <
hemalatha.amru...@googlemail.com> wrote:

> It is certainly possible through a hack.
> I was referring to below post where TD says it is possible thru a hack. I
> wanted to know if there is  any way other than editing the Spark source
> code.
>
> https://groups.google.com/forum/#!searchin/spark-users/manua
> lclock%7Csort:relevance/spark-users/ES8X1l_xn5s/6PvGGRDfgnMJ
>
> On Wed, Mar 1, 2017 at 7:09 AM, Saisai Shao 
> wrote:
>
>> I don't think using ManualClock is a right way to fix your problem here
>> in Spark Streaming.
>>
>> ManualClock in Spark is mainly used for unit test, it should manually
>> advance the time to make the unit test work. The usage looks different
>> compared to the scenario you mentioned.
>>
>> Thanks
>> Jerry
>>
>> On Tue, Feb 28, 2017 at 10:53 PM, Hemalatha A <
>> hemalatha.amru...@googlemail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I am running streaming application reading data from kafka and
>>> performing window operations on it. I have a usecase where  all incoming
>>> events have a fixed latency of 10s, which means data belonging to minute
>>> 10:00:00 will arrive 10s late at 10:00:10.
>>>
>>> I want to set the spark clock to "Manualclock" and set the time behind
>>> by 10s so that the batch calculation triggers at 10:00:10, during which
>>> time all the events for the previous minute has arrived.
>>>
>>> But, I see that "spark.streaming.clock" is hardcoded to "
>>> org.apache.spark.util.SystemClock" in the code.
>>>
>>> Is there a way to easily  hack this property to use Manual clock.
>>> --
>>>
>>>
>>> Regards
>>> Hemalatha
>>>
>>
>>
>
>
> --
>
>
> Regards
> Hemalatha
>



-- 


Regards
Hemalatha


JSON lib works differently in spark-shell and IDE like intellij

2017-04-05 Thread Mungeol Heo
Hello,

I am using "minidev" which is a JSON lib to remove duplicated keys in
JSON object.


minidev



  net.minidev
  json-smart
  2.3



Test Code


import net.minidev.json.parser.JSONParser
val badJson = "{\"keyA\":\"valueA\",\"keyB\":\"valueB\",\"keyA\":\"valueA\"}"
val json = new 
JSONParser(JSONParser.MODE_PERMISSIVE).parse(badJson.toLowerCase())
println(json)



The source code placed above works at IDE like intellij.
But, it gives error at spark-shell


Error


net.minidev.json.parser.ParseException: Unexpected duplicate key:keya
at position 33.



BTW, both IDE and spark-shell using same version of scala which is 2.11.8.
And, of course, same version of "minidev"

Any help will be great.
Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: convert JavaRDD<List> to JavaRDD

2017-04-05 Thread hosur narahari
Use flatmap function on JavaRDD

On 5 Apr 2017 3:13 p.m., "Hamza HACHANI"  wrote:

> I want to convert a JavaRDD to JavaRDD. For example
> if there is 3 elment in List 3 Object would be created in my new
> JavaRDD.
>
> Does any one have an idea ?
>


convert JavaRDD<List> to JavaRDD

2017-04-05 Thread Hamza HACHANI
I want to convert a JavaRDD to JavaRDD. For example if 
there is 3 elment in List 3 Object would be created in my new 
JavaRDD.

Does any one have an idea ?


Re: Market Basket Analysis by deploying FP Growth algorithm

2017-04-05 Thread Patrick Plaatje
Hi Arun,

We have been running into the same issue (having only 1000 unique items, in 
100MM transactions), but have not investigated the root cause of this. We 
decided to run this on a cluster instead (4*16 / 64GB Ram), after which the OOM 
issue went away. However, we ran into the issue that the FPGrowth 
implementation starts spilling over to disk, and we had to increase the /tmp 
partition.

Hope it helps.

BR,
-patrick



On 05/04/2017, 10:29, "asethia"  wrote:

Hi,

We are currently working on a Market Basket Analysis by deploying FP Growth
algorithm on Spark to generate association rules for product recommendation.
We are running on close to 24 million invoices over an assortment of more
than 100k products. However, whenever we relax the support threshold below a
certain level, the stack overflows. We are using Spark 1.6.2 but can somehow
invoke 1.6.3 to counter this error. The problem though is even when we
invoke Spark 1.6.3 and increase the stack size to 100M we are running out of
memory. We believe the tree grows exponentially and is stored in memory
which causes this problem. Can anyone suggest a solution to this issue
please?

Thanks
Arun



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Market-Basket-Analysis-by-deploying-FP-Growth-algorithm-tp28569.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Market Basket Analysis by deploying FP Growth algorithm

2017-04-05 Thread asethia
Hi,

We are currently working on a Market Basket Analysis by deploying FP Growth
algorithm on Spark to generate association rules for product recommendation.
We are running on close to 24 million invoices over an assortment of more
than 100k products. However, whenever we relax the support threshold below a
certain level, the stack overflows. We are using Spark 1.6.2 but can somehow
invoke 1.6.3 to counter this error. The problem though is even when we
invoke Spark 1.6.3 and increase the stack size to 100M we are running out of
memory. We believe the tree grows exponentially and is stored in memory
which causes this problem. Can anyone suggest a solution to this issue
please?

Thanks
Arun



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Market-Basket-Analysis-by-deploying-FP-Growth-algorithm-tp28569.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why do we ever run out of memory in Spark Structured Streaming?

2017-04-05 Thread kant kodali
Hi!

I am talking about "stateful operations like aggregations". Does this
happen on heap or off heap by default? I came across a article where I saw
both on and off heap are possible but I am not sure what happens by default
and when Spark or Spark Structured Streaming decides to store off heap?

I don't even know what mapGroupsWithState does since It's not part of spark
2.1 which is what we currently use. Any pointers would be great.

Thanks!





On Tue, Apr 4, 2017 at 5:34 PM, Tathagata Das 
wrote:

> Are you referring to the memory usage of stateful operations like
> aggregations, or the new mapGroupsWithState?
> The current implementation of the internal state store (that maintains the
> stateful aggregates) is such that it keeps all the data in memory of the
> executor. It does use HDFS-compatible file system for checkpointing, but as
> of now, it currently keeps all the data in memory of the executor. This is
> something we will improve in the future.
>
> That said, you can enabled watermarking in your query that would
> automatically clear old, unnecessary state thus limiting the total memory
> used for stateful operations.
> Furthermore, you can also monitor the size of the state and get alerts if
> the state is growing too large.
>
> Read more in the programming guide.
> Watermarking - http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#handling-late-data-and-watermarking
> Monitoring - http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#monitoring-streaming-queries
>
> In case you were referring to something else, please give us more context
> details - what query, what are the symptoms you are observing.
>
> On Tue, Apr 4, 2017 at 5:17 PM, kant kodali  wrote:
>
>> Why do we ever run out of memory in Spark Structured Streaming especially
>> when Memory can always spill to disk ? until the disk is full we shouldn't
>> be out of memory.isn't it? sure thrashing will happen more frequently and
>> degrades performance but we do we ever run out Memory even in case of
>> maintaining a state for 6 months or a year?
>>
>> Thanks!
>>
>
>


reading binary file in spark-kafka streaming

2017-04-05 Thread Yogesh Vyas
Hi,

I am having a binary file which I try to read in Kafka Producer and send to
message queue. This I read in the Spark-Kafka consumer as streaming job.
But it is giving me following error:

UnicodeDecodeError: 'utf8' codec can't decode byte 0xa9 in position 112:
invalid start byte

Can anyone please tell me why that error is and how to fix it?

Regards,
Yogesh