Kafka stream offset management question

2016-11-08 Thread Haopu Wang
I'm using Kafka direct stream (auto.offset.reset = earliest) and enable
Spark streaming's checkpoint.

 

The application starts and consumes messages correctly. Then I stop the
application and clean the checkpoint folder.

 

I restart the application and expect it to consumes old messages. But it
doesn't consume any data. And there are logs as below:

 

 [org.apache.spark.streaming.kafka010.KafkaRDD] (Executor task
launch worker-0;) Beginning offset 25 is the same as ending offset
skipping aa 0

 

So I think the offset is stored not only in checkpoint but also in
Kafka, right?

Is it because I'm using the same group.id? How can I delete the consumer
group manually?

 

Thanks again for any help!

 



RE: InvalidClassException when load KafkaDirectStream from checkpoint (Spark 2.0.0)

2016-11-08 Thread Haopu Wang
It turns out to be a bug in application code. Thank you!

 



From: Haopu Wang 
Sent: 2016年11月4日 17:23
To: user@spark.apache.org; Cody Koeninger
Subject: InvalidClassException when load KafkaDirectStream from checkpoint 
(Spark 2.0.0)

 

When I load spark checkpoint, I get below error. Do you have any idea? 

Much thanks!

 

*

 

2016-11-04 17:12:19,582 INFO  [org.apache.spark.streaming.CheckpointReader] 
(main;) Checkpoint files found: 
file:/d:/temp/checkpoint/checkpoint-147825070,file:/d:/temp/checkpoint/checkpoint-147825070.bk,file:/d:/temp/checkpoint/checkpoint-147825069,file:/d:/temp/checkpoint/checkpoint-147825069.bk,file:/d:/temp/checkpoint/checkpoint-147825068,file:/d:/temp/checkpoint/checkpoint-147825068.bk,file:/d:/temp/checkpoint/checkpoint-147825067,file:/d:/temp/checkpoint/checkpoint-147825067.bk

2016-11-04 17:12:19,584 INFO  [org.apache.spark.streaming.CheckpointReader] 
(main;) Attempting to load checkpoint from file 
file:/d:/temp/checkpoint/checkpoint-147825070

2016-11-04 17:12:19,640 DEBUG [org.apache.spark.streaming.DStreamGraph] (main;) 
DStreamGraph.readObject used

2016-11-04 17:12:19,661 DEBUG 
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream] (main;) 
DirectKafkaInputDStream.readObject used

2016-11-04 17:12:19,664 DEBUG 
[org.apache.spark.streaming.dstream.DStreamCheckpointData] (main;) 
DStreamCheckpointData.readObject used

2016-11-04 17:12:19,679 DEBUG 
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData]
 (main;) DirectKafkaInputDStreamCheckpointData.readObject used

2016-11-04 17:12:19,685 ERROR [org.apache.spark.util.Utils] (main;) Exception 
encountered

java.io.InvalidClassException: 
scala.collection.convert.Wrappers$MutableMapWrapper; no valid constructor

 at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)

 at 
java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1772)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:193)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStreamGraph.scala:189)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStreamGraph.scala:189)

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)

 at 
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:189)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798

RE: expected behavior of Kafka dynamic topic subscription

2016-11-06 Thread Haopu Wang
Cody, thanks for the response. Do you think it's a Spark issue or Kafka issue? 
Can you please let me know the jira ticket number?

-Original Message-
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 2016年11月4日 22:35
To: Haopu Wang
Cc: user@spark.apache.org
Subject: Re: expected behavior of Kafka dynamic topic subscription

That's not what I would expect from the underlying kafka consumer, no.

But this particular case (no matching topics, then add a topic after
SubscribePattern stream starts) actually isn't part of unit tests for
either the DStream or the structured stream.

I'll make a jira ticket.

On Thu, Nov 3, 2016 at 9:43 PM, Haopu Wang  wrote:
> I'm using Kafka010 integration API to create a DStream using
> SubscriberPattern ConsumerStrategy.
>
> The specified topic doesn't exist when I start the application.
>
> Then I create the topic and publish some test messages. I can see them in
> the console subscriber.
>
> But the spark application doesn't seem to get the messages.
>
> I think this is not expected, right? What should I check to resolve it?
>
> Thank you very much!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



InvalidClassException when load KafkaDirectStream from checkpoint (Spark 2.0.0)

2016-11-04 Thread Haopu Wang
When I load spark checkpoint, I get below error. Do you have any idea? 

Much thanks!

 

*

 

2016-11-04 17:12:19,582 INFO
[org.apache.spark.streaming.CheckpointReader] (main;) Checkpoint files
found:
file:/d:/temp/checkpoint/checkpoint-147825070,file:/d:/temp/checkpoi
nt/checkpoint-147825070.bk,file:/d:/temp/checkpoint/checkpoint-14782
5069,file:/d:/temp/checkpoint/checkpoint-147825069.bk,file:/d:/t
emp/checkpoint/checkpoint-147825068,file:/d:/temp/checkpoint/checkpo
int-147825068.bk,file:/d:/temp/checkpoint/checkpoint-147825067,f
ile:/d:/temp/checkpoint/checkpoint-147825067.bk

2016-11-04 17:12:19,584 INFO
[org.apache.spark.streaming.CheckpointReader] (main;) Attempting to load
checkpoint from file file:/d:/temp/checkpoint/checkpoint-147825070

2016-11-04 17:12:19,640 DEBUG [org.apache.spark.streaming.DStreamGraph]
(main;) DStreamGraph.readObject used

2016-11-04 17:12:19,661 DEBUG
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream] (main;)
DirectKafkaInputDStream.readObject used

2016-11-04 17:12:19,664 DEBUG
[org.apache.spark.streaming.dstream.DStreamCheckpointData] (main;)
DStreamCheckpointData.readObject used

2016-11-04 17:12:19,679 DEBUG
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafka
InputDStreamCheckpointData] (main;)
DirectKafkaInputDStreamCheckpointData.readObject used

2016-11-04 17:12:19,685 ERROR [org.apache.spark.util.Utils] (main;)
Exception encountered

java.io.InvalidClassException:
scala.collection.convert.Wrappers$MutableMapWrapper; no valid
constructor

 at
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectS
treamClass.java:150)

 at
java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1772
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$
sp(DStreamGraph.scala:193)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStr
eamGraph.scala:189)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStr
eamGraph.scala:189)

 at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)

 at
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:18
9)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 at
org.apache.spark.streaming.Checkpoint$$anonfun$deserialize$2.apply(Check
point.scala:164)

 at
org.apache.spark.st

expected behavior of Kafka dynamic topic subscription

2016-11-03 Thread Haopu Wang
I'm using Kafka010 integration API to create a DStream using
SubscriberPattern ConsumerStrategy.

The specified topic doesn't exist when I start the application.

Then I create the topic and publish some test messages. I can see them
in the console subscriber.

But the spark application doesn't seem to get the messages.

I think this is not expected, right? What should I check to resolve it?

Thank you very much!



RE: Kafka integration: get existing Kafka messages?

2016-10-14 Thread Haopu Wang
Cody, the link is helpful. But I still have issues in my test.

I set "auto.offset.reset" to "earliest" and then create KafkaRDD using 
OffsetRange which is out of range.

According to Kafka's document, I expect to get earliest offset of that 
partition.

But I get below exception and it looks like "auto.offset.reset" is ignored at 
all. Please help, thanks again!

==
16/10/14 15:45:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-null mytopic2 0 2 after polling for 512
==

-Original Message-
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 2016年10月13日 9:31
To: Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Kafka integration: get existing Kafka messages?

Look at the presentation and blog post linked from

https://github.com/koeninger/kafka-exactly-once

They refer to the kafka 0.8 version of the direct stream but the basic
idea is the same

On Wed, Oct 12, 2016 at 7:35 PM, Haopu Wang  wrote:
> Cody, thanks for the response.
>
>
>
> So Kafka direct stream actually has consumer on both the driver and
> executor? Can you please provide more details? Thank you very much!
>
>
>
> 
>
> From: Cody Koeninger [mailto:c...@koeninger.org]
> Sent: 2016年10月12日 20:10
> To: Haopu Wang
> Cc: user@spark.apache.org
> Subject: Re: Kafka integration: get existing Kafka messages?
>
>
>
> its set to none for the executors, because otherwise they wont do exactly
> what the driver told them to do.
>
>
>
> you should be able to set up the driver consumer to determine batches
> however you want, though.
>
> On Wednesday, October 12, 2016, Haopu Wang  wrote:
>
> Hi,
>
>
>
> I want to read the existing Kafka messages and then subscribe new stream
> messages.
>
> But I find "auto.offset.reset" property is always set to "none" in
> KafkaUtils. Does that mean I cannot specify "earliest" property value when
> create direct stream?
>
> Thank you!
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Kafka integration: get existing Kafka messages?

2016-10-12 Thread Haopu Wang
Cody, thanks for the response.

 

So Kafka direct stream actually has consumer on both the driver and executor? 
Can you please provide more details? Thank you very much!

 



From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 2016年10月12日 20:10
To: Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Kafka integration: get existing Kafka messages?

 

its set to none for the executors, because otherwise they wont do exactly what 
the driver told them to do.

 

you should be able to set up the driver consumer to determine batches however 
you want, though.

On Wednesday, October 12, 2016, Haopu Wang  wrote:

Hi,

 

I want to read the existing Kafka messages and then subscribe new stream 
messages. 

But I find "auto.offset.reset" property is always set to "none" in KafkaUtils. 
Does that mean I cannot specify "earliest" property value when create direct 
stream?

Thank you!

 



Kafka integration: get existing Kafka messages?

2016-10-12 Thread Haopu Wang
Hi,

 

I want to read the existing Kafka messages and then subscribe new stream
messages. 

But I find "auto.offset.reset" property is always set to "none" in
KafkaUtils. Does that mean I cannot specify "earliest" property value
when create direct stream?

Thank you!

 



Can Spark Streaming 2.0 work with Kafka 0.10?

2016-09-26 Thread Haopu Wang
Hi, in the official integration guide, it says "Spark Streaming 2.0.0 is
compatible with Kafka 0.8.2.1."

 

However, in maven repository, I can get
"spark-streaming-kafka-0-10_2.11" which depends on Kafka 0.10.0.0

Is this artifact stable enough? Thank you!

 

 



[Spark 1.6.1] Beeline cannot start on Windows7

2016-06-27 Thread Haopu Wang
I see below stack trace when trying to run beeline command. I'm using
JDK 7. 

Anything wrong? Much thanks!

 

==

D:\spark\download\spark-1.6.1-bin-hadoop2.4>bin\beeline

Beeline version 1.6.1 by Apache Hive

Exception in thread "main" java.lang.NoSuchMethodError:
org.fusesource.jansi.internal.Kernel32.GetConsoleOutputCP()I

at
jline.WindowsTerminal.getConsoleOutputCodepage(WindowsTerminal.java:293)

at
jline.WindowsTerminal.getOutputEncoding(WindowsTerminal.java:186)

at jline.console.ConsoleReader.(ConsoleReader.java:230)

at jline.console.ConsoleReader.(ConsoleReader.java:221)

at jline.console.ConsoleReader.(ConsoleReader.java:209)

at
org.apache.hive.beeline.BeeLine.getConsoleReader(BeeLine.java:834)

at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:770)

at
org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:48
4)

at org.apache.hive.beeline.BeeLine.main(BeeLine.java:467)



RE: Can I control the execution of Spark jobs?

2016-06-16 Thread Haopu Wang
Jacek,

For example, one ETL job is saving raw events and update a file.
The other job is using that file's content to process the data set.

In this case, the first job has to be done before the second one. That's what I 
mean by dependency. Any suggestions/comments are appreciated.

-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl] 
Sent: 2016年6月16日 19:09
To: user
Subject: Re: Can I control the execution of Spark jobs?

Hi,

When you say "several ETL types of things", what is this exactly? What
would an example of "dependency between these jobs" be?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jun 16, 2016 at 11:36 AM, Haopu Wang  wrote:
> Hi,
>
>
>
> Suppose I have a spark application which is doing several ETL types of
> things.
>
> I understand Spark can analyze and generate several jobs to execute.
>
> The question is: is it possible to control the dependency between these
> jobs?
>
>
>
> Thanks!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can I control the execution of Spark jobs?

2016-06-16 Thread Haopu Wang
Hi,

 

Suppose I have a spark application which is doing several ETL types of
things.

I understand Spark can analyze and generate several jobs to execute.

The question is: is it possible to control the dependency between these
jobs?

 

Thanks!

 



RE: Should I avoid "state" in an Spark application?

2016-06-12 Thread Haopu Wang
Can someone look at my questions? Thanks again!

 



From: Haopu Wang 
Sent: 2016年6月12日 16:40
To: user@spark.apache.org
Subject: Should I avoid "state" in an Spark application?

 

I have a Spark application whose structure is below:

 

var ts: Long = 0L

dstream1.foreachRDD{

(x, time) => {

ts = time

x.do_something()...

}

}

..

process_data(dstream2, ts, ..)

 

I assume foreachRDD function call can update "ts" variable which is then used 
in the Spark tasks of "process_data" function.

 

>From my test result of a standalone Spark cluster, it is working. But should I 
>concern if switch to YARN?

 

And I saw some articles are recommending to avoid state in Scala programming. 
Without the state variable, how could that be done?

 

Any comments or suggestions are appreciated.

 

Thanks,

Haopu



Should I avoid "state" in an Spark application?

2016-06-12 Thread Haopu Wang
I have a Spark application whose structure is below:

 

var ts: Long = 0L

dstream1.foreachRDD{

(x, time) => {

ts = time

x.do_something()...

}

}

..

process_data(dstream2, ts, ..)

 

I assume foreachRDD function call can update "ts" variable which is then
used in the Spark tasks of "process_data" function.

 

>From my test result of a standalone Spark cluster, it is working. But
should I concern if switch to YARN?

 

And I saw some articles are recommending to avoid state in Scala
programming. Without the state variable, how could that be done?

 

Any comments or suggestions are appreciated.

 

Thanks,

Haopu



How to get the list of running applications and Cores/Memory in use?

2015-12-06 Thread Haopu Wang
Hi,

 

I have a Spark 1.5.2 standalone cluster running.

 

I want to get all of the running applications and Cores/Memory in use.

 

Besides the Master UI, is there any other ways to do that?

 

I tried to send HTTP request using URL like this:
"http://node1:6066/v1/applications";

 

The response is like below:

 

==

 

{
  "action" : "ErrorResponse",
  "highestProtocolVersion" : "v1",
  "message" : "Unknown protocol version 'v1'. Please submit requests
through http://[host]:[port]/v1/submissions/...";,
  "serverSparkVersion" : "1.5.2"
}

 

 



RE: RE: Spark or Storm

2015-06-19 Thread Haopu Wang
My question is not directly related: about the "exactly-once semantic",
the document (copied below) said spark streaming gives exactly-once
semantic, but actually from my test result, with check-point enabled,
the application always re-process the files in last batch after
gracefully restart.

 

==


Semantics of Received Data


Different input sources provide different guarantees, ranging from
at-least once to exactly once. Read for more details.


With Files


If all of the input data is already present in a fault-tolerant files
system like HDFS, Spark Streaming can always recover from any failure
and process all the data. This gives exactly-once semantics, that all
the data will be processed exactly once no matter what fails.

 

 



From: Enno Shioji [mailto:eshi...@gmail.com] 
Sent: Friday, June 19, 2015 5:29 PM
To: Tathagata Das
Cc: prajod.vettiyat...@wipro.com; Cody Koeninger; bit1...@163.com;
Jordan Pilat; Will Briggs; Ashish Soni; ayan guha;
user@spark.apache.org; Sateesh Kavuri; Spark Enthusiast; Sabarish
Sasidharan
Subject: Re: RE: Spark or Storm

 

Fair enough, on second thought, just saying that it should be idempotent
is indeed more confusing.

 

I guess the crux of the confusion comes from the fact that people tend
to assume the work you described (store batch id and skip etc.) is
handled by the framework, perhaps partly because Storm Trident does
handle it (you just need to let Storm know if the output operation has
succeeded or not, and it handles the batch id storing & skipping
business). Whenever I explain people that one needs to do this
additional work you described to get end-to-end exactly-once semantics,
it usually takes a while to convince them. In my limited experience,
they tend to interpret "transactional" in that sentence to mean that you
just have to write to a transactional storage like ACID RDB. Pointing
them to "Semantics of output operations" is usually sufficient though.

 

Maybe others like @Ashish can weigh on this; did you interpret it in
this way?

 

What if we change the statement into:

"end-to-end exactly-once semantics (if your updates to downstream
systems are idempotent or transactional). To learn how to make your
updates idempotent or transactional, see the "Semantics of output
operations" section in this chapter
 "

 

That way, it's clear that it's not sufficient to merely write to a
"transactional storage" like ACID store.

 

 

 

 

 

 

 

On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das 
wrote:

If the current documentation is confusing, we can definitely improve the
documentation. However, I dont not understand why is the term
"transactional" confusing. If your output operation has to add 5, then
the user has to implement the following mechanism

 

1. If the unique id of the batch of data is already present in the
store, then skip the update

2. Otherwise atomically do both, the update operation as well as store
the unique id of the batch. This is pretty much the definition of a
transaction. The user has to be aware of the transactional semantics of
the data store while implementing this functionality. 

 

You CAN argue that this effective makes the whole updating sort-a
idempotent, as even if you try doing it multiple times, it will update
only once. But that is not what is generally considered as idempotent.
Writing a fixed count, not an increment, is usually what is called
idempotent. And so just mentioning that the output operation must be
idempotent is, in my opinion, more confusing.

 

To take a page out of the Storm / Trident guide, even they call this
exact conditional updating of Trident State as "transactional"
operation. See "transactional spout" in the Trident State guide -
https://storm.apache.org/documentation/Trident-state

 

In the end, I am totally open the suggestions and PRs on how to make the
programming guide easier to understand. :)

 

TD

 

On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji  wrote:

Tbh I find the doc around this a bit confusing. If it says "end-to-end
exactly-once semantics (if your updates to downstream systems are
idempotent or transactional)", I think most people will interpret it
that as long as you use a storage which has atomicity (like
MySQL/Postgres etc.), a successful output operation for a given batch
(let's say "+ 5") is going to be issued exactly-once against the
storage.

 

However, as I understand it that's not what this statement means. What
it is saying is, it will always issue "+5" and never, say "+6", because
it makes sure a message is processed exactly-once internally. However,
it *may* issue "+5" more than once for a given batch, and it is up to
the developer to deal with this by either making the output operation
idempotent (e.g. "set 5"), or "transactional" (e.g. keep track of batch
IDs and skip already applied batches etc.).

 

I wonder if it makes m

RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-18 Thread Haopu Wang
Akhil,

 

>From my test, I can see the files in the last batch will alwyas be
reprocessed upon restarting from checkpoint even for graceful shutdown.

 

I think usually the file is expected to be processed only once. Maybe
this is a bug in fileStream? or do you know any approach to workaround
it?

 

Much thanks!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Tuesday, June 16, 2015 3:26 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

Good question, with  fileStream or textFileStream basically it will only
takes in the files whose timestamp is > the current timestamp
<https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc
7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI
nputDStream.scala#L172>  and when checkpointing is enabled
<https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc
7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI
nputDStream.scala#L324>  it would restore the latest filenames from the
checkpoint directory which i believe will kind of reprocess some files.




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 2:49 PM, Haopu Wang  wrote:

Akhil, thank you for the response. I want to explore more.

 

If the application is just monitoring a HDFS folder and output the word
count of each streaming batch into also HDFS.

 

When I kill the application _before_ spark takes a checkpoint, after
recovery, spark will resume the processing from the timestamp of latest
checkpoint. That means some files will be processed twice and duplicate
results are generated.

 

Please correct me if the understanding is wrong, thanks again!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, June 15, 2015 3:48 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

I think it should be fine, that's the whole point of check-pointing (in
case of driver failure etc).




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang  wrote:

Hi, can someone help to confirm the behavior? Thank you!


-Original Message-
From: Haopu Wang
Sent: Friday, June 12, 2015 4:57 PM
To: user
Subject: If not stop StreamingContext gracefully, will checkpoint data
be consistent?

This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



RE: [SparkStreaming] NPE in DStreamCheckPointData.scala:125

2015-06-17 Thread Haopu Wang
Can someone help? Thank you!



From: Haopu Wang 
Sent: Monday, June 15, 2015 3:36 PM
To: user; d...@spark.apache.org
Subject: [SparkStreaming] NPE in DStreamCheckPointData.scala:125



I use the attached program to test checkpoint. It's quite simple.

 

When I run the program second time, it will load checkpoint data, that's
expected, however I see NPE in driver log.

 

Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very
much!

 

== logs ==

 

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435313 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435314 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435315 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435316 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435317 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435318 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435319 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error
starting the context, marking it as stopped

java.io.IOException: java.lang.NullPointerException

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)

   at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt
reamCheckpointData.scala:123)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$
mcV$sp(DStream.scala:498)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493
)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV
$sp(DStreamGraph.scala:181)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1

RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-15 Thread Haopu Wang
Akhil, thank you for the response. I want to explore more.

 

If the application is just monitoring a HDFS folder and output the word
count of each streaming batch into also HDFS.

 

When I kill the application _before_ spark takes a checkpoint, after
recovery, spark will resume the processing from the timestamp of latest
checkpoint. That means some files will be processed twice and duplicate
results are generated.

 

Please correct me if the understanding is wrong, thanks again!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, June 15, 2015 3:48 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

I think it should be fine, that's the whole point of check-pointing (in
case of driver failure etc).




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang  wrote:

Hi, can someone help to confirm the behavior? Thank you!


-Original Message-
From: Haopu Wang
Sent: Friday, June 12, 2015 4:57 PM
To: user
Subject: If not stop StreamingContext gracefully, will checkpoint data
be consistent?

This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



[SparkStreaming] NPE in DStreamCheckPointData.scala:125

2015-06-15 Thread Haopu Wang
I use the attached program to test checkpoint. It's quite simple.

 

When I run the program second time, it will load checkpoint data, that's
expected, however I see NPE in driver log.

 

Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very
much!

 

== logs ==

 

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435313 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435314 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435315 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435316 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435317 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435318 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435319 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error
starting the context, marking it as stopped

java.io.IOException: java.lang.NullPointerException

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)

   at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt
reamCheckpointData.scala:123)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$
mcV$sp(DStream.scala:498)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493
)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV
$sp(DStreamGraph.scala:181)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:1
76)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.Delega

RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-14 Thread Haopu Wang
Hi, can someone help to confirm the behavior? Thank you!

-Original Message-
From: Haopu Wang 
Sent: Friday, June 12, 2015 4:57 PM
To: user
Subject: If not stop StreamingContext gracefully, will checkpoint data
be consistent?

This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-12 Thread Haopu Wang
This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Haopu Wang
Jerry, I agree with you.

 

However, in my case, I kept the monitoring the "blockmanager" folder. I
do see sometimes the number of files decreased, but the folder's size
kept increasing.

 

And below is a screenshot of the folder. You can see some old files are
not deleted somehow.

 

 

 

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting
"spark.cleaner.ttl"

 

>From the stack I think this problem may be due to the deletion of
broadcast variable, as you set the spark.cleaner.ttl, so after this
timeout limit, the old broadcast variable will be deleted,  you will
meet this exception when you want to use it again after that time limit.

 

Basically I think you don't need to use this configuration, Spark
Streaming will automatically delete the old, unused data, also Spark
itself will delete this metadata using weak reference. Also this
configuration will be deprecated in the coming release.

 

Thanks

Jerry

 

-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com] 

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting
"spark.cleaner.ttl"

 

When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

 

I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.

 

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

 

The setting took effect but after that, below exception happened.

 

Do you have any idea about this error? Thank you!

 



 

15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at scala.Option.getOrElse(Option.scala:120)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc

ala:136)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apa

[SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Haopu Wang
When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
times; aborting job
15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
143382585 ms.0





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-09 Thread Haopu Wang
Cheng,

yes, it works, I set the property in SparkConf before initiating
SparkContext.
The property name is "spark.hadoop.dfs.replication"
Thanks fro the help!

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Monday, June 08, 2015 6:41 PM
To: Haopu Wang; user
Subject: Re: SparkSQL: How to specify replication factor on the
persisted parquet files?

Then one possible workaround is to set "dfs.replication" in 
"sc.hadoopConfiguration".

However, this configuration is shared by all Spark jobs issued within 
the same application. Since different Spark jobs can be issued from 
different threads, you need to pay attention to synchronization.

Cheng

On 6/8/15 2:46 PM, Haopu Wang wrote:
> Cheng, thanks for the response.
>
> Yes, I was using HiveContext.setConf() to set "dfs.replication".
> However, I cannot change the value in Hadoop core-site.xml because
that
> will change every HDFS file.
> I only want to change the replication factor of some specific files.
>
> -Original Message-
> From: Cheng Lian [mailto:lian.cs@gmail.com]
> Sent: Sunday, June 07, 2015 10:17 PM
> To: Haopu Wang; user
> Subject: Re: SparkSQL: How to specify replication factor on the
> persisted parquet files?
>
> Were you using HiveContext.setConf()?
>
> "dfs.replication" is a Hadoop configuration, but setConf() is only
used
> to set Spark SQL specific configurations. You may either set it in
your
> Hadoop core-site.xml.
>
> Cheng
>
>
> On 6/2/15 2:28 PM, Haopu Wang wrote:
>> Hi,
>>
>> I'm trying to save SparkSQL DataFrame to a persistent Hive table
using
>> the default parquet data source.
>>
>> I don't know how to change the replication factor of the generated
>> parquet files on HDFS.
>>
>> I tried to set "dfs.replication" on HiveContext but that didn't work.
>> Any suggestions are appreciated very much!
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-07 Thread Haopu Wang
Cheng, thanks for the response.

Yes, I was using HiveContext.setConf() to set "dfs.replication".
However, I cannot change the value in Hadoop core-site.xml because that
will change every HDFS file.
I only want to change the replication factor of some specific files.

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Sunday, June 07, 2015 10:17 PM
To: Haopu Wang; user
Subject: Re: SparkSQL: How to specify replication factor on the
persisted parquet files?

Were you using HiveContext.setConf()?

"dfs.replication" is a Hadoop configuration, but setConf() is only used 
to set Spark SQL specific configurations. You may either set it in your 
Hadoop core-site.xml.

Cheng


On 6/2/15 2:28 PM, Haopu Wang wrote:
> Hi,
>
> I'm trying to save SparkSQL DataFrame to a persistent Hive table using
> the default parquet data source.
>
> I don't know how to change the replication factor of the generated
> parquet files on HDFS.
>
> I tried to set "dfs.replication" on HiveContext but that didn't work.
> Any suggestions are appreciated very much!
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL: How to specify replication factor on the persisted parquet files?

2015-06-01 Thread Haopu Wang
Hi,

I'm trying to save SparkSQL DataFrame to a persistent Hive table using
the default parquet data source.

I don't know how to change the replication factor of the generated
parquet files on HDFS.

I tried to set "dfs.replication" on HiveContext but that didn't work.
Any suggestions are appreciated very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3.0: how to let Spark history load old records?

2015-06-01 Thread Haopu Wang
When I start the Spark master process, the old records are not shown in
the monitoring UI.

How to show the old records? Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Haopu Wang
I want to use file stream as input. And I look at SparkStreaming
document again, it's saying file stream doesn't need a receiver at all.

So I'm wondering if I can control a specific DStream instance.

 



From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:39 AM
To: 'Akhil Das'; Haopu Wang
Cc: 'user'
Subject: RE: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

You can make ANY standard receiver sleep by implementing a custom
Message Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and
make the receiver sleep till the event has happened. That will obviously
run your streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang  wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: Is it feasible to keep millions of keys in state of Spark Streaming job for two months?

2015-05-14 Thread Haopu Wang
Hi TD, regarding to the performance of updateStateByKey, do you have a
JIRA for that so we can watch it? Thank you!

 



From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Wednesday, April 15, 2015 8:09 AM
To: Krzysztof Zarzycki
Cc: user
Subject: Re: Is it feasible to keep millions of keys in state of Spark
Streaming job for two months?

 

Fundamentally, stream processing systems are designed for processing
streams of data, not for storing large volumes of data for a long period
of time. So if you have to maintain that much state for months, then its
best to use another system that is designed for long term storage (like
Cassandra) which has proper support for making all that state
fault-tolerant, high-performant, etc. So yes, the best option is to use
Cassandra for the state and Spark Streaming jobs accessing the state
from Cassandra. There are a number of optimizations that can be done.
Its not too hard to build a simple on-demand populated cache (singleton
hash map for example), that speeds up access from Cassandra, and all
updates are written through the cache. This is a common use of Spark
Streaming + Cassandra/HBase. 

 

Regarding the performance of updateStateByKey, we are aware of the
limitations, and we will improve it soon :)

 

TD

 

 

On Tue, Apr 14, 2015 at 12:34 PM, Krzysztof Zarzycki
 wrote:

Hey guys, could you please help me with a question I asked on
Stackoverflow:
https://stackoverflow.com/questions/29635681/is-it-feasible-to-keep-mill
ions-of-keys-in-state-of-spark-streaming-job-for-two ?  I'll be really
grateful for your help!

I'm also pasting the question below:

I'm trying to solve a (simplified here) problem in Spark Streaming:
Let's say I have a log of events made by users, where each event is a
tuple (user name, activity, time), e.g.:

("user1", "view", "2015-04-14T21:04Z") ("user1", "click",
"2015-04-14T21:05Z")

Now I would like to gather events by user to do some analysis of that.
Let's say that output is some analysis of:

("user1", List(("view", "2015-04-14T21:04Z"),("click",
"2015-04-14T21:05Z"))

The events should be kept for even 2 months. During that time there
might be around 500 milionof such events, and millions of unique users,
which are keys here.

My questions are:

*  Is it feasible to do such a thing with
updateStateByKey on DStream, when I have millions of keys stored?

*  Am I right that DStream.window is no use here, when I
have 2 months length window and would like to have a slide of few
seconds?

P.S. I found out, that updateStateByKey is called on all the keys on
every slide, so that means it will be called millions of time every few
seconds. That makes me doubt in this design and I'm rather thinking
about alternative solutions like:

*  using Cassandra for state

*  using Trident state (with Cassandra probably)

*  using Samza with its state management.

 



RE: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-14 Thread Haopu Wang
Thank you, should I open a JIRA for this issue?

 



From: Olivier Girardot [mailto:ssab...@gmail.com] 
Sent: Tuesday, May 12, 2015 5:12 AM
To: Reynold Xin
Cc: Haopu Wang; user
Subject: Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

 

I'll look into it - not sure yet what I can get out of exprs :p 

 

Le lun. 11 mai 2015 à 22:35, Reynold Xin  a écrit :

Thanks for catching this. I didn't read carefully enough.

 

It'd make sense to have the udaf result be non-nullable, if the exprs are 
indeed non-nullable.

 

On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot  wrote:

Hi Haopu, 
actually here `key` is nullable because this is your input's schema : 

scala> result.printSchema

root 
|-- key: string (nullable = true) 
|-- SUM(value): long (nullable = true) 

scala> df.printSchema 
root 
|-- key: string (nullable = true) 
|-- value: long (nullable = false)

 

I tried it with a schema where the key is not flagged as nullable, and the 
schema is actually respected. What you can argue however is that SUM(value) 
should also be not nullable since value is not nullable.

 

@rxin do you think it would be reasonable to flag the Sum aggregation function 
as nullable (or not) depending on the input expression's schema ?

 

Regards, 

 

Olivier.

Le lun. 11 mai 2015 à 22:07, Reynold Xin  a écrit :

Not by design. Would you be interested in submitting a pull request?

 

On Mon, May 11, 2015 at 1:48 AM, Haopu Wang  wrote:

I try to get the result schema of aggregate functions using DataFrame
API.

However, I find the result field of groupBy columns are always nullable
even the source field is not nullable.

I want to know if this is by design, thank you! Below is the simple code
to show the issue.

==

  import sqlContext.implicits._
  import org.apache.spark.sql.functions._
  case class Test(key: String, value: Long)
  val df = sc.makeRDD(Seq(Test("k1",2),Test("k1",1))).toDF

  val result = df.groupBy("key").agg($"key", sum("value"))

  // From the output, you can see the "key" column is nullable, why??
  result.printSchema
//root
// |-- key: string (nullable = true)
// |-- SUM(value): long (nullable = true)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



[SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-14 Thread Haopu Wang
In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Haopu Wang
I try to get the result schema of aggregate functions using DataFrame
API.

However, I find the result field of groupBy columns are always nullable
even the source field is not nullable.

I want to know if this is by design, thank you! Below is the simple code
to show the issue.

==

  import sqlContext.implicits._
  import org.apache.spark.sql.functions._
  case class Test(key: String, value: Long)
  val df = sc.makeRDD(Seq(Test("k1",2),Test("k1",1))).toDF
  
  val result = df.groupBy("key").agg($"key", sum("value"))
  
  // From the output, you can see the "key" column is nullable, why??
  result.printSchema
//root
// |-- key: string (nullable = true)
// |-- SUM(value): long (nullable = true)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkSQL] cannot filter by a DateType column

2015-05-10 Thread Haopu Wang
Sorry, I was using Spark 1.3.x.

 

I cannot reproduce it in master.

 

But should I still open a JIRA because can I request it to be back
ported to 1.3.x branch? Thanks again!

 



From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Saturday, May 09, 2015 2:41 AM
To: Haopu Wang
Cc: user; d...@spark.apache.org
Subject: Re: [SparkSQL] cannot filter by a DateType column

 

What version of Spark are you using?  It appears that at least in master
we are doing the conversion correctly, but its possible older versions
of applySchema do not.  If you can reproduce the same bug in master, can
you open a JIRA?

 

On Fri, May 8, 2015 at 1:36 AM, Haopu Wang  wrote:

I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

exception=

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

code=

 

val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val sc = new SparkContext(conf)

val sqlCtx = new HiveContext(sc)

import sqlCtx.implicits._



case class Test(dt: java.sql.Date)

 

val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF



var r = df.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")

var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

println("==")

 

// "df2" doesn't do filter correct!!

val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)



val schema = StructType(Array(StructField("dt", DateType, false)))



val df2 = sqlCtx.applySchema(rdd2, schema) 



r = df2.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")



r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

 

 



[SparkSQL] cannot filter by a DateType column

2015-05-08 Thread Haopu Wang
I want to filter a DataFrame based on a Date column. 

 

If the DataFrame object is constructed from a scala case class, it's
working (either compare as String or Date). But if the DataFrame is
generated by specifying a Schema to an RDD, it doesn't work. Below is
the exception and test code.

 

Do you have any idea about the error? Thank you very much!

 

exception=

java.lang.ClassCastException: java.sql.Date cannot be cast to
java.lang.Integer

at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2$$
anonfun$apply$6.apply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$cata
lyst$expressions$Cast$$buildCast(Cast.scala:111)

at
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$2.a
pply(Cast.scala:116)

at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:426)

at
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.eval(predic
ates.scala:305)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$
apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

 

code=

 

val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val sc = new SparkContext(conf)

val sqlCtx = new HiveContext(sc)

import sqlCtx.implicits._



case class Test(dt: java.sql.Date)

 

val df = sc.makeRDD(Seq(Test(new java.sql.Date(115,4,7.toDF



var r = df.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")

var r2 = df.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

println("==")

 

// "df2" doesn't do filter correct!!

val rdd2 = sc.makeRDD(Seq((Row(new java.sql.Date(115,4,7)



val schema = StructType(Array(StructField("dt", DateType, false)))



val df2 = sqlCtx.applySchema(rdd2, schema) 



r = df2.filter("dt >= '2015-05-06'")

r.explain(true)

r.show

println("==")



r2 = df2.filter("dt >= cast('2015-05-06' as DATE)")

r2.explain(true)

r2.show

 



RE: Spark does not delete temporary directories

2015-05-07 Thread Haopu Wang
I think the temporary folders are used to store blocks and shuffles.
That doesn't depend on the cluster manager.

Ideally they should be removed after the application has been
terminated.

Can you check if there are contents under those folders?

 



From: Taeyun Kim [mailto:taeyun@innowireless.com] 
Sent: Friday, May 08, 2015 9:42 AM
To: 'Ted Yu'; 'Todd Nist'; user@spark.apache.org
Subject: RE: Spark does not delete temporary directories

 

Thanks, but it seems that the option is for Spark standalone mode only.

I've (lightly) tested the options with local mode and yarn-client mode,
the 'temp' directories were not deleted.

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Thursday, May 07, 2015 10:47 PM
To: Todd Nist
Cc: Taeyun Kim; user@spark.apache.org
Subject: Re: Spark does not delete temporary directories

 

Default value for spark.worker.cleanup.enabled is false:


private val CLEANUP_ENABLED =
conf.getBoolean("spark.worker.cleanup.enabled", false)

 

I wonder if the default should be set as true.

 

Cheers

 

On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:

Have you tried to set the following?

spark.worker.cleanup.enabled=true 
spark.worker.cleanup.appDataTtl="

 

 

On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
wrote:

Hi,

 

After a spark program completes, there are 3 temporary directories
remain in the temp directory.

The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7

 

And the Spark program runs on Windows, a snappy DLL file also remains in
the temp directory.

The file name is like this:
snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava

 

They are created every time the Spark program runs. So the number of
files and directories keeps growing.

 

How can let them be deleted?

 

Spark version is 1.3.1 with Hadoop 2.6.

 

Thanks.

 

 

 

 



RE: Is SQLContext thread-safe?

2015-04-30 Thread Haopu Wang
Hi, in a test on SparkSQL 1.3.0, multiple threads are doing select on a
same SQLContext instance, but below exception is thrown, so it looks
like SQLContext is NOT thread safe? I think this is not the desired
behavior.

==

java.lang.RuntimeException: [1.1] failure: ``insert'' expected but
identifier select found

select id ,ext.d from UNIT_TEST
^
 at scala.sys.package$.error(package.scala:27)
 at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
SQLParser.scala:40)
 at
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:130)
 at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
QLParser$$others$1.apply(SparkSQLParser.scala:96)
 at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkS
QLParser$$others$1.apply(SparkSQLParser.scala:95)
 at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at
scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
s.scala:242)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parser
s.scala:242)
 at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
apply$2.apply(Parsers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$
apply$2.apply(Parsers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
sers.scala:254)
 at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Par
sers.scala:254)
 at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
rsers.scala:891)
 at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Pa
rsers.scala:891)
 at
scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParser
s.scala:110)
 at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSpark
SQLParser.scala:38)
 at
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
la:134)
 at
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.sca
la:134)
 at scala.Option.getOrElse(Option.scala:120)
 at
org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:134)
 at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:915)

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-----
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in
different threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkSQL 1.3.0] Cannot resolve column name "SUM('p.q)" among (k, SUM('p.q));

2015-04-02 Thread Haopu Wang
Michael, thanks for the response and looking forward to try 1.3.1

 



From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Friday, April 03, 2015 6:52 AM
To: Haopu Wang
Cc: user
Subject: Re: [SparkSQL 1.3.0] Cannot resolve column name "SUM('p.q)"
among (k, SUM('p.q));

 

Thanks for reporting.  The root cause is (SPARK-5632
<https://issues.apache.org/jira/browse/SPARK-5632> ), which is actually
pretty hard to fix.  Fortunately, for this particular case there is an
easy workaround: https://github.com/apache/spark/pull/5337

 

We can try to include this in 1.3.1.

 

On Thu, Apr 2, 2015 at 3:29 AM, Haopu Wang  wrote:

Hi, I want to rename an aggregation field using DataFrame API. The
aggregation is done on a nested field. But I got below exception.

Do you see the same issue and any workaround? Thank you very much!

 

==

Exception in thread "main" org.apache.spark.sql.AnalysisException:
Cannot resolve column name "SUM('p.q)" among (k, SUM('p.q));

at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:
162)

at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:
162)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161)

at org.apache.spark.sql.DataFrame.col(DataFrame.scala:436)

at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:426)

at
org.apache.spark.sql.DataFrame$$anonfun$3.apply(DataFrame.scala:244)

at
org.apache.spark.sql.DataFrame$$anonfun$3.apply(DataFrame.scala:243)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)

at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:243)

==

 

And this code can be used to reproduce the issue:

 

  case class ChildClass(q: Long)

  case class ParentClass(k: String, p: ChildClass)

 

  def main(args: Array[String]): Unit = {



val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val ctx = new SparkContext(conf)

val sqlCtx = new HiveContext(ctx)

 

import sqlCtx.implicits._

 

val source = ctx.makeRDD(Seq(ParentClass("c1",
ChildClass(100.toDF

 

import org.apache.spark.sql.functions._

 

val target = source.groupBy('k).agg('k, sum("p.q"))



// This line prints the correct contents

// k  SUM('p.q)

// c1 100

target.show



// But this line triggers the exception

target.toDF("key", "total")

 

==

 



[SparkSQL 1.3.0] Cannot resolve column name "SUM('p.q)" among (k, SUM('p.q));

2015-04-02 Thread Haopu Wang
Hi, I want to rename an aggregation field using DataFrame API. The
aggregation is done on a nested field. But I got below exception.

Do you see the same issue and any workaround? Thank you very much!

 

==

Exception in thread "main" org.apache.spark.sql.AnalysisException:
Cannot resolve column name "SUM('p.q)" among (k, SUM('p.q));

at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:
162)

at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:
162)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161)

at org.apache.spark.sql.DataFrame.col(DataFrame.scala:436)

at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:426)

at
org.apache.spark.sql.DataFrame$$anonfun$3.apply(DataFrame.scala:244)

at
org.apache.spark.sql.DataFrame$$anonfun$3.apply(DataFrame.scala:243)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.s
cala:33)

at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:243)

==

 

And this code can be used to reproduce the issue:

 

  case class ChildClass(q: Long)

  case class ParentClass(k: String, p: ChildClass)

 

  def main(args: Array[String]): Unit = {



val conf = new
SparkConf().setAppName("DFTest").setMaster("local[*]")

val ctx = new SparkContext(conf)

val sqlCtx = new HiveContext(ctx)

 

import sqlCtx.implicits._

 

val source = ctx.makeRDD(Seq(ParentClass("c1",
ChildClass(100.toDF

 

import org.apache.spark.sql.functions._

 

val target = source.groupBy('k).agg('k, sum("p.q"))



// This line prints the correct contents

// k  SUM('p.q)

// c1 100

target.show



// But this line triggers the exception

target.toDF("key", "total")

 

==



RE: Can I call aggregate UDF in DataFrame?

2015-04-01 Thread Haopu Wang
Great! Thank you!

 



From: Reynold Xin [mailto:r...@databricks.com] 
Sent: Thursday, April 02, 2015 8:11 AM
To: Haopu Wang
Cc: user; d...@spark.apache.org
Subject: Re: Can I call aggregate UDF in DataFrame?

 

You totally can.

 

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/
apache/spark/sql/DataFrame.scala#L792

 

There is also an attempt at adding stddev here already:
https://github.com/apache/spark/pull/5228

 

 

 

On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang 
wrote:

Specifically there are only 5 aggregate functions in class
org.apache.spark.sql.GroupedData: sum/max/min/mean/count.

Can I plugin a function to calculate stddev?

Thank you!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Can I call aggregate UDF in DataFrame?

2015-03-26 Thread Haopu Wang
Specifically there are only 5 aggregate functions in class
org.apache.spark.sql.GroupedData: sum/max/min/mean/count.

Can I plugin a function to calculate stddev?

Thank you!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SparkSQL] How to calculate stddev on a DataFrame?

2015-03-25 Thread Haopu Wang
Hi,

 

I have a DataFrame object and I want to do types of aggregations like
count, sum, variance, stddev, etc.

 

DataFrame has DSL to do simple aggregations like count and sum.

 

How about variance and stddev?

 

Thank you for any suggestions!

 



RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?

2015-03-10 Thread Haopu Wang
Hao, thanks for the response.

 

For Q1, in my case, I have a tool on SparkShell which serves multiple
users where they can use different Hive installation. I take a look at
the code of HiveContext. It looks like I cannot do that today because
"catalog" field cannot be changed after initialize.

 

  /* A catalyst metadata catalog that points to the Hive Metastore. */

  @transient

  override protected[sql] lazy val catalog = new
HiveMetastoreCatalog(this) with OverrideCatalog

 

For Q2, I check HDFS and it is running as a cluster. I can run the DDL
from spark shell with HiveContext as well. To reproduce the exception, I
just run below script. It happens in the last step.

 

15/03/11 14:24:48 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET
hive.metastore.warehouse.dir=hdfs://server:8020/space/warehouse")

scala> sqlContext.sql("CREATE TABLE IF NOT EXISTS src(key INT, value
STRING)")

scala> sqlContext.sql("LOAD DATA LOCAL INPATH
'examples/src/main/resources/kv1.txt' INTO TABLE src")

scala> var output = sqlContext.sql("SELECT key,value FROM src")

scala> output.saveAsTable("outputtable")

 



From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Wednesday, March 11, 2015 8:25 AM
To: Haopu Wang; user; d...@spark.apache.org
Subject: RE: [SparkSQL] Reuse HiveContext to different Hive warehouse?

 

I am not so sure if Hive supports change the metastore after
initialized, I guess not. Spark SQL totally rely on Hive Metastore in
HiveContext, probably that's why it doesn't work as expected for Q1.

 

BTW, in most of cases, people configure the metastore settings in
hive-site.xml, and will not change that since then, is there any reason
that you want to change that in runtime?

 

For Q2, probably something wrong in configuration, seems the HDFS run
into the pseudo/single node mode, can you double check that? Or can you
run the DDL (like create a table) from the spark shell with HiveContext?


 

From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, March 10, 2015 6:38 PM
To: user; d...@spark.apache.org
Subject: [SparkSQL] Reuse HiveContext to different Hive warehouse?

 

I'm using Spark 1.3.0 RC3 build with Hive support.

 

In Spark Shell, I want to reuse the HiveContext instance to different
warehouse locations. Below are the steps for my test (Assume I have
loaded a file into table "src").

 

==

15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table1")

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table2")

==

After these steps, the tables are stored in "/test/w" only. I expect
"table2" to be stored in "/test/w2" folder.

 

Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS
folder, I cannot use saveAsTable()? Is this by design? Exception stack
trace is below:

==

15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0

15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast
at TableReader.scala:74

java.lang.IllegalArgumentException: Wrong FS:
hdfs://server:8020/space/warehouse/table2, expected: file:///
 

at
org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)

at
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463)

at
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav
a:118)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:252)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:251)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP
arquet.scala:251)

at
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37
0)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:96)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:125)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)

  

[SparkSQL] Reuse HiveContext to different Hive warehouse?

2015-03-10 Thread Haopu Wang
I'm using Spark 1.3.0 RC3 build with Hive support.

 

In Spark Shell, I want to reuse the HiveContext instance to different
warehouse locations. Below are the steps for my test (Assume I have
loaded a file into table "src").

 

==

15/03/10 18:22:59 INFO SparkILoop: Created sql context (with Hive
support)..

SQL context available as sqlContext.

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table1")

scala> sqlContext.sql("SET hive.metastore.warehouse.dir=/test/w2")

scala> sqlContext.sql("SELECT * from src").saveAsTable("table2")

==

After these steps, the tables are stored in "/test/w" only. I expect
"table2" to be stored in "/test/w2" folder.

 

Another question is: if I set "hive.metastore.warehouse.dir" to a HDFS
folder, I cannot use saveAsTable()? Is this by design? Exception stack
trace is below:

==

15/03/10 18:35:28 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0

15/03/10 18:35:28 INFO SparkContext: Created broadcast 0 from broadcast
at TableReader.scala:74

java.lang.IllegalArgumentException: Wrong FS:
hdfs://server:8020/space/warehouse/table2, expected: file:///

at
org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)

at
org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:463)

at
org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.jav
a:118)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:252)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.a
pply(newParquet.scala:251)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
ala:244)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
scala.collection.AbstractTraversable.map(Traversable.scala:105)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newP
arquet.scala:251)

at
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:37
0)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:96)

at
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.sca
la:125)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308)

at
org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.ru
n(commands.scala:217)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompu
te(commands.scala:55)

at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands
.scala:55)

at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65
)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLConte
xt.scala:1088)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:10
88)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:998)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:964)

at
org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:942)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:20)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)

at $iwC$$iwC$$iwC$$iwC$$iwC.(:29)

at $iwC$$iwC$$iwC$$iwC.(:31)

at $iwC$$iwC$$iwC.(:33)

at $iwC$$iwC.(:35)

at $iwC.(:37)

at (:39)

 

Thank you very much!

 



RE: Where can I find more information about the R interface forSpark?

2015-03-04 Thread Haopu Wang
Thanks, it's an active project.

 

Will it be released with Spark 1.3.0?

 



From: 鹰 [mailto:980548...@qq.com] 
Sent: Thursday, March 05, 2015 11:19 AM
To: Haopu Wang; user
Subject: Re: Where can I find more information about the R interface forSpark?

 

you can search SparkR on google or search it on github 



Spark Streaming and SchemaRDD usage

2015-03-04 Thread Haopu Wang
Hi, in the roadmap of Spark in 2015 (link:
http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p
ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark
Streaming and Spark SQL.

My question is: what's the typical usage of SchemaRDD in a Spark
Streaming application? Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Is SQLContext thread-safe?

2015-03-02 Thread Haopu Wang
Hao, thank you so much for the reply!

Do you already have some JIRA for the discussion?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Tuesday, March 03, 2015 8:23 AM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Currently, each SQLContext has its own configuration, e.g. shuffle
partition number, codegen etc. and it will be shared among the multiple
threads running.

We actually has some internal discussions on this, probably will provide
a "thread local" configuration in the future for a single SQLContext
instance.

-Original Message-----
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, March 3, 2015 7:56 AM
To: Cheng, Hao; user
Subject: RE: Is SQLContext thread-safe?

Thanks for the response.

Then I have another question: when will we want to create multiple
SQLContext instances from the same SparkContext? What's the benefit?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-----
From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in
different threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Is SQLContext thread-safe?

2015-03-02 Thread Haopu Wang
Thanks for the response.

Then I have another question: when will we want to create multiple
SQLContext instances from the same SparkContext? What's the benefit?

-Original Message-
From: Cheng, Hao [mailto:hao.ch...@intel.com] 
Sent: Monday, March 02, 2015 9:05 PM
To: Haopu Wang; user
Subject: RE: Is SQLContext thread-safe?

Yes it is thread safe, at least it's supposed to be.

-Original Message-----
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Monday, March 2, 2015 4:43 PM
To: user
Subject: Is SQLContext thread-safe?

Hi, is it safe to use the same SQLContext to do Select operations in
different threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is SQLContext thread-safe?

2015-03-02 Thread Haopu Wang
Hi, is it safe to use the same SQLContext to do Select operations in
different threads at the same time? Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)

2015-02-16 Thread Haopu Wang
I have a streaming application which registered temp table on a
HiveContext for each batch duration.

The application runs well in Spark 1.1.0. But I get below error from
1.1.1.

Do you have any suggestions to resolve it? Thank you!

 

java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf

- field (class "scala.Tuple2", name: "_1", type: "class
java.lang.Object")

- object (class "scala.Tuple2", (Configuration: core-default.xml,
core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa
che.hadoop.hive.ql.session.SessionState@49b6eef9))

- field (class "org.apache.spark.sql.hive.HiveContext", name: "x$3",
type: "class scala.Tuple2")

- object (class "org.apache.spark.sql.hive.HiveContext",
org.apache.spark.sql.hive.HiveContext@4e6e66a4)

- field (class
"com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2", name: "sqlContext$1", type: "class
org.apache.spark.sql.SQLContext")

   - object (class
"com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2", )

- field (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
name: "foreachFunc$1", type: "interface scala.Function1")

- object (class
"org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1",
)

- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")

- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)

- element of array (index: 0)

- array (class "[Ljava.lang.Object;", size: 16)

- field (class "scala.collection.mutable.ArrayBuffer", name:
"array", type: "class [Ljava.lang.Object;")

- object (class "scala.collection.mutable.ArrayBuffer",
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))

- field (class "org.apache.spark.streaming.DStreamGraph", name:
"outputStreams", type: "class scala.collection.mutable.ArrayBuffer")

- custom writeObject data (class
"org.apache.spark.streaming.DStreamGraph")

- object (class "org.apache.spark.streaming.DStreamGraph",
org.apache.spark.streaming.DStreamGraph@776ae7da)

- field (class "org.apache.spark.streaming.Checkpoint", name:
"graph", type: "class org.apache.spark.streaming.DStreamGraph")

- root object (class "org.apache.spark.streaming.Checkpoint",
org.apache.spark.streaming.Checkpoint@5eade065)

at java.io.ObjectOutputStream.writeObject0(Unknown Source)

 



Do you know any Spark modeling tool?

2014-12-25 Thread Haopu Wang
Hi, I think a modeling tool may be helpful because sometimes it's
hard/tricky to program Spark. I don't know if there is already such a
tool.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

2014-12-19 Thread Haopu Wang
Hi Sean,

 

I change Spark as provided dependency and declare hadoop-client 2.5.1 as 
compile dependency. 

Now I see this error when do “mvn package”. Do you know what could be the 
reason?

 

[INFO] --- scala-maven-plugin:3.1.3:compile (default) @ testspark ---

[WARNING]  Expected all dependencies to require Scala version: 2.10.0

[WARNING]  com.vitria:testspark:0.0.1-SNAPSHOT requires scala version: 2.10.0

[WARNING]  org.specs2:scalaz-core_2.10:7.0.0 requires scala version: 2.10.0

[WARNING]  org.specs2:scalaz-effect_2.10:7.0.0 requires scala version: 2.10.0

[WARNING]  org.specs2:scalaz-concurrent_2.10:7.0.0 requires scala version: 
2.10.0

[WARNING]  org.scalatest:scalatest_2.10:2.0 requires scala version: 2.10.0

[WARNING]  org.scala-lang:scala-reflect:2.10.0 requires scala version: 2.10.0

[WARNING]  com.twitter:chill_2.10:0.3.6 requires scala version: 2.10.3

[WARNING] Multiple versions of scala libraries detected!

[INFO] D:\spark\workspace\testspark\src\main\scala:-1: info: compiling

[INFO] Compiling 1 source files to D:\spark\workspace\testspark\target\classes 
at 1419035872083

[ERROR] error: error while loading , invalid CEN header (bad signature)

[ERROR] error: scala.reflect.internal.MissingRequirementError: object 
scala.runtime in compiler mirror not found.

[ERROR] at 
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)

[ERROR] at 
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)

……

 



From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Saturday, December 20, 2014 8:12 AM
To: Haopu Wang
Cc: user@spark.apache.org; Raghavendra Pandey
Subject: RE: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

 

That's exactly the problem. You should mark Spark as a provided dependency 
only, and must declare your direct dependency on the correct version of 
hadoop-client. 

On Dec 20, 2014 12:04 AM, "Haopu Wang"  wrote:

My application doesn’t depends on hadoop-client directly.

It only depends on spark-core_2.10 which depends on hadoop-client 1.0.4. This 
can be checked by Maven repository at 
http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.1.0

 

That’s strange and how to workaround the issue? Thanks for any suggestions.

 



From: Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com] 
Sent: Saturday, December 20, 2014 12:08 AM
To: Sean Owen; Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

 

It seems there is hadoop 1 somewhere in the path. 

On Fri, Dec 19, 2014, 21:24 Sean Owen  wrote:

Yes, but your error indicates that your application is actually using
Hadoop 1.x of some kind. Check your dependencies, especially
hadoop-client.

On Fri, Dec 19, 2014 at 2:11 PM, Haopu Wang  wrote:
> I’m using Spark 1.1.0 built for HDFS 2.4.
>
> My application enables check-point (to HDFS 2.5.1) and it can build. But
> when I run it, I get below error:
>
>
>
> Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC
> version 9 cannot communicate with client version 4
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>
> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>
> at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
>
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
>
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
>
> at
> org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
>
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
>
> at
> org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:201)
>
>
>
> Does that mean I have to use HDFS 2.4 to save check-point? Thank you!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

2014-12-19 Thread Haopu Wang
My application doesn’t depends on hadoop-client directly.

It only depends on spark-core_2.10 which depends on hadoop-client 1.0.4. This 
can be checked by Maven repository at 
http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.1.0

 

That’s strange and how to workaround the issue? Thanks for any suggestions.

 



From: Raghavendra Pandey [mailto:raghavendra.pan...@gmail.com] 
Sent: Saturday, December 20, 2014 12:08 AM
To: Sean Owen; Haopu Wang
Cc: user@spark.apache.org
Subject: Re: Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

 

It seems there is hadoop 1 somewhere in the path. 

On Fri, Dec 19, 2014, 21:24 Sean Owen  wrote:

Yes, but your error indicates that your application is actually using
Hadoop 1.x of some kind. Check your dependencies, especially
hadoop-client.

On Fri, Dec 19, 2014 at 2:11 PM, Haopu Wang  wrote:
> I’m using Spark 1.1.0 built for HDFS 2.4.
>
> My application enables check-point (to HDFS 2.5.1) and it can build. But
> when I run it, I get below error:
>
>
>
> Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC
> version 9 cannot communicate with client version 4
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>
> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>
> at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)
>
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
>
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
>
> at
> org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
>
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
>
> at
> org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:201)
>
>
>
> Does that mean I have to use HDFS 2.4 to save check-point? Thank you!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can Spark 1.1.0 save checkpoint to HDFS 2.5.1?

2014-12-19 Thread Haopu Wang
I’m using Spark 1.1.0 built for HDFS 2.4.

My application enables check-point (to HDFS 2.5.1) and it can build. But when I 
run it, I get below error:

 

Exception in thread "main" org.apache.hadoop.ipc.RemoteException: Server IPC 
version 9 cannot communicate with client version 4

at org.apache.hadoop.ipc.Client.call(Client.java:1070)

at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)

at com.sun.proxy.$Proxy6.getProtocolVersion(Unknown Source)

at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)

at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)

at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)

at 
org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:201)

 

Does that mean I have to use HDFS 2.4 to save check-point? Thank you!

 



RE: About "Memory usage" in the Spark UI

2014-10-23 Thread Haopu Wang
TD, thanks for the clarification.

 

>From the UI, it looks like the driver also allocate memory to store blocks, 
>what's the purpose for that because I think driver doesn't need to run tasks?

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年10月24日 8:07
To: Haopu Wang
Cc: Patrick Wendell; user
Subject: Re: About "Memory usage" in the Spark UI

 

The memory usage of blocks of data received through Spark Streaming is not 
reflected in the Spark UI. It only shows the memory usage due to cached RDDs.

I didnt find a JIRA for this, so I opened a new one. 

 

https://issues.apache.org/jira/browse/SPARK-4072

 

 

TD

 

On Thu, Oct 23, 2014 at 12:47 AM, Haopu Wang  wrote:

Patrick, thanks for the response. May I ask more questions?

 

I'm running a Spark Streaming application which receives data from socket and 
does some transformations.

 

The event injection rate is too high so the processing duration is larger than 
batch interval.

 

So I see "Could not compute split, block input-0-1414049609200 not found" issue 
as discussed by others in this post: 
"http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237";

 

If the understanding is correct, Spark is lack of storage in this case because 
of event pile-up, so it needs to delete some splits in order to free memory.

 

However, even in this case, I still see very small number (like 3MB) in the 
"Memory Used" column where the total memory seems to be quite big (like 6GB). 
So I think the number shown in this column may have problems.

 

How do Spark calculate the total memory based on allocated JVM heap size? I 
guess it's related with the "spark.storage.memoryFraction" configuration, but 
want to know the details.

And why the driver also uses memory to store RDD blocks?

 

Thanks again for the answer!

 



From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: 2014年10月23日 14:00
To: Haopu Wang
Cc: user
Subject: Re: About "Memory usage" in the Spark UI

 

It shows the amount of memory used to store RDD blocks, which are created when 
you run .cache()/.persist() on an RDD.

 

On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang  wrote:

Hi, please take a look at the attached screen-shot. I wonders what's the 
"Memory Used" column mean.

 

I give 2GB memory to the driver process and 12GB memory to the executor process.

 

Thank you!

 

 

 

 



RE: About "Memory usage" in the Spark UI

2014-10-23 Thread Haopu Wang
Patrick, thanks for the response. May I ask more questions?

 

I'm running a Spark Streaming application which receives data from socket and 
does some transformations.

 

The event injection rate is too high so the processing duration is larger than 
batch interval.

 

So I see "Could not compute split, block input-0-1414049609200 not found" issue 
as discussed by others in this post: 
"http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html#a11237";

 

If the understanding is correct, Spark is lack of storage in this case because 
of event pile-up, so it needs to delete some splits in order to free memory.

 

However, even in this case, I still see very small number (like 3MB) in the 
"Memory Used" column where the total memory seems to be quite big (like 6GB). 
So I think the number shown in this column may have problems.

 

How do Spark calculate the total memory based on allocated JVM heap size? I 
guess it's related with the "spark.storage.memoryFraction" configuration, but 
want to know the details.

And why the driver also uses memory to store RDD blocks?

 

Thanks again for the answer!

 



From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: 2014年10月23日 14:00
To: Haopu Wang
Cc: user
Subject: Re: About "Memory usage" in the Spark UI

 

It shows the amount of memory used to store RDD blocks, which are created when 
you run .cache()/.persist() on an RDD.

 

On Wed, Oct 22, 2014 at 10:07 PM, Haopu Wang  wrote:

Hi, please take a look at the attached screen-shot. I wonders what's the 
"Memory Used" column mean.

 

I give 2GB memory to the driver process and 12GB memory to the executor process.

 

Thank you!

 

 

 



About "Memory usage" in the Spark UI

2014-10-22 Thread Haopu Wang
Hi, please take a look at the attached screen-shot. I wonders what's the
"Memory Used" column mean.

 

I give 2GB memory to the driver process and 12GB memory to the executor
process.

 

Thank you!

 

 



Spark's shuffle file size keep increasing

2014-10-15 Thread Haopu Wang
I have a Spark application which is running Spark Streaming and Spark
SQL.
I observed the size of shuffle files under "spark.local.dir" folder
keeps increase and never decreases. Eventually it will run
out-of-disk-space error.

The question is: when will Spark delete these shuffle files?

In the application, I'm use some operations like "updateStateByKey" and
enabling checkpoint already.

Thank you!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-10-07 Thread Haopu Wang
Liquan, yes, for full outer join, one hash table on both sides is more 
efficient.

 

For the left/right outer join, it looks like one hash table should be enought.

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 18:34
To: Haopu Wang
Cc: d...@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

How about full outer join? One hash table may not be efficient for this case. 

 

Liquan

 

On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang  wrote:

Hi, Liquan, thanks for the response.

 

In your example, I think the hash table should be built on the "right" side, so 
Spark can iterate through the left side and find matches in the right side from 
the hash table efficiently. Please comment and suggest, thanks again!

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 12:31
To: Haopu Wang
Cc: d...@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

My understanding is that the hashtable on both left and right side is used for 
including null values in result in an efficient manner. If hash table is only 
built on one side, let's say left side and we perform a left outer join, for 
each row in left side, a scan over the right side is needed to make sure that 
no matching tuples for that row on left side. 

 

Hope this helps!

Liquan

 

On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang  wrote:

I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 



RE: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-29 Thread Haopu Wang
Hi, Liquan, thanks for the response.

 

In your example, I think the hash table should be built on the "right" side, so 
Spark can iterate through the left side and find matches in the right side from 
the hash table efficiently. Please comment and suggest, thanks again!

 



From: Liquan Pei [mailto:liquan...@gmail.com] 
Sent: 2014年9月30日 12:31
To: Haopu Wang
Cc: d...@spark.apache.org; user
Subject: Re: Spark SQL question: why build hashtable for both sides in 
HashOuterJoin?

 

Hi Haopu,

 

My understanding is that the hashtable on both left and right side is used for 
including null values in result in an efficient manner. If hash table is only 
built on one side, let's say left side and we perform a left outer join, for 
each row in left side, a scan over the right side is needed to make sure that 
no matching tuples for that row on left side. 

 

Hope this helps!

Liquan

 

On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang  wrote:

I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





 

-- 
Liquan Pei 
Department of Physics 
University of Massachusetts Amherst 



Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-29 Thread Haopu Wang
I take a look at HashOuterJoin and it's building a Hashtable for both
sides.

This consumes quite a lot of memory when the partition is big. And it
doesn't reduce the iteration on streamed relation, right?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL question: how to control the storage level of cached SchemaRDD?

2014-09-28 Thread Haopu Wang
Thanks for the response. From Spark Web-UI's Storage tab, I do see cached RDD 
there.

 

But the storage level is "Memory Deserialized 1x Replicated". How can I change 
the storage level? Because I have a big table there.

 

Thanks!

 



From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: 2014年9月26日 21:24
To: Haopu Wang; user@spark.apache.org
Subject: Re: Spark SQL question: is cached SchemaRDD storage controlled by 
"spark.storage.memoryFraction"?

 

Yes it is. The in-memory storage used with SchemaRDD also uses RDD.cache() 
under the hood.

On 9/26/14 4:04 PM, Haopu Wang wrote:

Hi, I'm querying a big table using Spark SQL. I see very long GC time in
some stages. I wonder if I can improve it by tuning the storage
parameter.
 
The question is: the schemaRDD has been cached with "cacheTable()"
function. So is the cached schemaRDD part of memory storage controlled
by the "spark.storage.memoryFraction" parameter?
 
Thanks!
 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
 

​



Spark SQL question: is cached SchemaRDD storage controlled by "spark.storage.memoryFraction"?

2014-09-26 Thread Haopu Wang
Hi, I'm querying a big table using Spark SQL. I see very long GC time in
some stages. I wonder if I can improve it by tuning the storage
parameter.

The question is: the schemaRDD has been cached with "cacheTable()"
function. So is the cached schemaRDD part of memory storage controlled
by the "spark.storage.memoryFraction" parameter?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL 1.1.0: NPE when join two cached table

2014-09-22 Thread Haopu Wang
I have two data sets and want to join them on each first field. Sample
data are below:

 

data set 1:

  id2,name1,2,300.0

 

data set 2:

  id1,

 

The code is something like below:

 

val sparkConf = new SparkConf().setAppName("JoinInScala")

val sc = new SparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed",
"true")

import org.apache.spark.sql._   



val testdata = sc.textFile(args(0) + "data.txt").map(_.split(","))

  .map(p => Row(p(0), p(1).trim, p(2).trim.toLong,
p(3).trim.toDouble))

  

val fields = new Array[StructField](4)

fields(0) = StructField("id", StringType, false);

fields(1) = StructField("name", StringType, false);

fields(2) = StructField("agg1", LongType, false);

fields(3) = StructField("agg2", DoubleType, false);

val schema = StructType(fields);

 

val data = sqlContext.applySchema(testdata, schema)



data.registerTempTable("datatable")

sqlContext.cacheTable("datatable")

 

val refdata = sc.textFile(args(0) + "ref.txt").map(_.split(","))

  .map(p => Row(p(0), p(1).trim))

  

val reffields = new Array[StructField](2)

reffields(0) = StructField("id", StringType, false);

reffields(1) = StructField("data", StringType, true);

val refschema = StructType(reffields);

 

val refschemardd = sqlContext.applySchema(refdata, refschema)

refschemardd.registerTempTable("ref")

sqlContext.cacheTable("ref")



   val results = sqlContext.sql("SELECT
d.id,d.name,d.agg1,d.agg2,ref.data FROM datatable as d join ref on
d.id=ref.id")

results.foreach(T => Unit);

 

But I got below NullPointerException. If I comment out the two
"cacheTable()" calls, the program run well. Please shed some lights,
thank you!

 

Exception in thread "main" java.lang.NullPointerException

at
org.apache.spark.sql.columnar.InMemoryRelation.statistics$lzycompute(InM
emoryColumnarTableScan.scala:43)

at
org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumn
arTableScan.scala:42)

at
org.apache.spark.sql.execution.SparkStrategies$HashJoin$.apply(SparkStra
tegies.scala:83)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)

at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.s
cala:59)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlann
er.scala:54)

at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(Spa
rkStrategies.scala:268)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(Que
ryPlanner.scala:58)

at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.s
cala:59)

at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLC
ontext.scala:402)

at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scal
a:400)

at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(S
QLContext.scala:406)

at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.s
cala:406)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLConte
xt.scala:409)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:40
9)

at
org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)

at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)

at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.dependencies(RDD.scala:189)

at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1233)

at
org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:117)

at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)

at org.apache.spark.rdd.RDD.foreach(RDD.scala:759)

at Join$$anonfun$main$1.apply$mcVI$sp(Join.scala:44)

at
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at Join$.main(Join.scala:42)

at Join.main(Join.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Meth

RE: Announcing Spark 1.1.0!

2014-09-11 Thread Haopu Wang
Got it, thank you, Denny!

 



From: Denny Lee [mailto:denny.g@gmail.com] 
Sent: Friday, September 12, 2014 11:04 AM
To: user@spark.apache.org; Haopu Wang; d...@spark.apache.org; Patrick Wendell
Subject: RE: Announcing Spark 1.1.0!

 

Yes, atleast for my query scenarios, I have been able to use Spark 1.1 with 
Hadoop 2.4 against Hadoop 2.5.  Note, Hadoop 2.5 is considered a relatively 
minor release 
(http://hadoop.apache.org/releases.html#11+August%2C+2014%3A+Release+2.5.0+available)
 where Hadoop 2.4 and 2.3 were considered more significant releases.

 

 

 

On September 11, 2014 at 19:22:05, Haopu Wang (hw...@qilinsoft.com) wrote:

From the web page 
(https://spark.apache.org/docs/latest/building-with-maven.html) which is 
pointed out by you, it’s saying “Because HDFS is not protocol-compatible across 
versions, if you want to read from HDFS, you’ll need to build Spark against the 
specific HDFS version in your environment.”

 

Did you try to read a hadoop 2.5.0 file using Spark 1.1 with hadoop 2.4?

 

Thanks!

 





From:Denny Lee [mailto:denny.g@gmail.com]
Sent: Friday, September 12, 2014 10:00 AM
To: Patrick Wendell; Haopu Wang; d...@spark.apache.org; 
user@spark.apache.org
Subject: RE: Announcing Spark 1.1.0!

 

Please correct me if I’m wrong but I was under the impression as per 
the maven repositories that it was just to stay more in sync with the various 
version of Hadoop.  Looking at the latest documentation 
(https://spark.apache.org/docs/latest/building-with-maven.html), there are 
multiple Hadoop versions called out.

 

As for the potential differences in Spark, this is more about ensuring 
the various jars and library dependencies of the correct version of Hadoop are 
included so there can be proper connectivity to Hadoop from Spark vs. any 
differences in Spark itself.   Another good reference on this topic is call out 
for Hadoop versions within github: https://github.com/apache/spark

 

HTH!

 

 

On September 11, 2014 at 18:39:10, Haopu Wang (hw...@qilinsoft.com) 
wrote:

Danny, thanks for the response.

 

I raise the question because in Spark 1.0.2, I saw one binary 
package for hadoop2, but in Spark 1.1.0, there are separate packages for hadoop 
2.3 and 2.4.

That implies some difference in Spark according to hadoop 
version.

 





From:Denny Lee [mailto:denny.g@gmail.com]
Sent: Friday, September 12, 2014 9:35 AM
To: user@spark.apache.org; Haopu Wang; d...@spark.apache.org; 
Patrick Wendell
Subject: RE: Announcing Spark 1.1.0!

 

I’m not sure if I’m completely answering your question here but 
I’m currently working (on OSX) with Hadoop 2.5 and I used the Spark 1.1 with 
Hadoop 2.4 without any issues.

 

 

On September 11, 2014 at 18:11:46, Haopu Wang 
(hw...@qilinsoft.com) wrote:

I see the binary packages include hadoop 1, 2.3 and 2.4.
Does Spark 1.1.0 support hadoop 2.5.0 at below address?


http://hadoop.apache.org/releases.html#11+August%2C+2014%3A+Release+2.5.0+available

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com]
Sent: Friday, September 12, 2014 8:13 AM
To: d...@spark.apache.org; user@spark.apache.org
Subject: Announcing Spark 1.1.0!

I am happy to announce the availability of Spark 1.1.0! 
Spark 1.1.0 is
the second release on the API-compatible 1.X line. It 
is Spark's
largest release ever, with contributions from 171 
developers!

This release brings operational and performance 
improvements in Spark
core including a new implementation of the Spark 
shuffle designed for
very large scale workloads. Spark 1.1 adds significant 
extensions to
the newest Spark modules, MLlib and Spark SQL. Spark 
SQL introduces a
JDBC server, byte code generation for fast expression 
evaluation, a
public types API, JSON support, and other features and 
optimizations.
MLlib introduces a new statistics library along with 
several new
algorithms and optimizations. Spark 1.1 also builds out 
Spark

RE: Announcing Spark 1.1.0!

2014-09-11 Thread Haopu Wang
From the web page 
(https://spark.apache.org/docs/latest/building-with-maven.html) which is 
pointed out by you, it’s saying “Because HDFS is not protocol-compatible across 
versions, if you want to read from HDFS, you’ll need to build Spark against the 
specific HDFS version in your environment.”

 

Did you try to read a hadoop 2.5.0 file using Spark 1.1 with hadoop 2.4?

 

Thanks!

 



From: Denny Lee [mailto:denny.g@gmail.com] 
Sent: Friday, September 12, 2014 10:00 AM
To: Patrick Wendell; Haopu Wang; d...@spark.apache.org; user@spark.apache.org
Subject: RE: Announcing Spark 1.1.0!

 

Please correct me if I’m wrong but I was under the impression as per the maven 
repositories that it was just to stay more in sync with the various version of 
Hadoop.  Looking at the latest documentation 
(https://spark.apache.org/docs/latest/building-with-maven.html), there are 
multiple Hadoop versions called out.

 

As for the potential differences in Spark, this is more about ensuring the 
various jars and library dependencies of the correct version of Hadoop are 
included so there can be proper connectivity to Hadoop from Spark vs. any 
differences in Spark itself.   Another good reference on this topic is call out 
for Hadoop versions within github: https://github.com/apache/spark

 

HTH!

 

 

On September 11, 2014 at 18:39:10, Haopu Wang (hw...@qilinsoft.com) wrote:

Danny, thanks for the response.

 

I raise the question because in Spark 1.0.2, I saw one binary package 
for hadoop2, but in Spark 1.1.0, there are separate packages for hadoop 2.3 and 
2.4.

That implies some difference in Spark according to hadoop version.

 





From:Denny Lee [mailto:denny.g@gmail.com]
Sent: Friday, September 12, 2014 9:35 AM
To: user@spark.apache.org; Haopu Wang; d...@spark.apache.org; Patrick 
Wendell
Subject: RE: Announcing Spark 1.1.0!

 

I’m not sure if I’m completely answering your question here but I’m 
currently working (on OSX) with Hadoop 2.5 and I used the Spark 1.1 with Hadoop 
2.4 without any issues.

 

 

On September 11, 2014 at 18:11:46, Haopu Wang (hw...@qilinsoft.com) 
wrote:

I see the binary packages include hadoop 1, 2.3 and 2.4.
Does Spark 1.1.0 support hadoop 2.5.0 at below address?


http://hadoop.apache.org/releases.html#11+August%2C+2014%3A+Release+2.5.0+available

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com]
Sent: Friday, September 12, 2014 8:13 AM
To: d...@spark.apache.org; user@spark.apache.org
Subject: Announcing Spark 1.1.0!

I am happy to announce the availability of Spark 1.1.0! Spark 
1.1.0 is
the second release on the API-compatible 1.X line. It is Spark's
largest release ever, with contributions from 171 developers!

This release brings operational and performance improvements in 
Spark
core including a new implementation of the Spark shuffle 
designed for
very large scale workloads. Spark 1.1 adds significant 
extensions to
the newest Spark modules, MLlib and Spark SQL. Spark SQL 
introduces a
JDBC server, byte code generation for fast expression 
evaluation, a
public types API, JSON support, and other features and 
optimizations.
MLlib introduces a new statistics library along with several new
algorithms and optimizations. Spark 1.1 also builds out Spark's 
Python
support and adds new components to the Spark Streaming module.

Visit the release notes [1] to read about the new features, or
download [2] the release today.

[1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html
[2] http://spark.eu.apache.org/downloads.html

NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE 
FOR SEVERAL HOURS.

Please e-mail me directly for any type-o's in the release notes 
or name listing.

Thanks, and congratulations!
- Patrick


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Announcing Spark 1.1.0!

2014-09-11 Thread Haopu Wang
Danny, thanks for the response.

 

I raise the question because in Spark 1.0.2, I saw one binary package for 
hadoop2, but in Spark 1.1.0, there are separate packages for hadoop 2.3 and 2.4.

That implies some difference in Spark according to hadoop version.

 



From: Denny Lee [mailto:denny.g@gmail.com] 
Sent: Friday, September 12, 2014 9:35 AM
To: user@spark.apache.org; Haopu Wang; d...@spark.apache.org; Patrick Wendell
Subject: RE: Announcing Spark 1.1.0!

 

I’m not sure if I’m completely answering your question here but I’m currently 
working (on OSX) with Hadoop 2.5 and I used the Spark 1.1 with Hadoop 2.4 
without any issues.

 

 

On September 11, 2014 at 18:11:46, Haopu Wang (hw...@qilinsoft.com) wrote:

I see the binary packages include hadoop 1, 2.3 and 2.4. 
Does Spark 1.1.0 support hadoop 2.5.0 at below address? 


http://hadoop.apache.org/releases.html#11+August%2C+2014%3A+Release+2.5.0+available
 

-Original Message- 
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Friday, September 12, 2014 8:13 AM 
To: d...@spark.apache.org; user@spark.apache.org 
Subject: Announcing Spark 1.1.0! 

I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is 
the second release on the API-compatible 1.X line. It is Spark's 
largest release ever, with contributions from 171 developers! 

This release brings operational and performance improvements in Spark 
core including a new implementation of the Spark shuffle designed for 
very large scale workloads. Spark 1.1 adds significant extensions to 
the newest Spark modules, MLlib and Spark SQL. Spark SQL introduces a 
JDBC server, byte code generation for fast expression evaluation, a 
public types API, JSON support, and other features and optimizations. 
MLlib introduces a new statistics library along with several new 
algorithms and optimizations. Spark 1.1 also builds out Spark's Python 
support and adds new components to the Spark Streaming module. 

Visit the release notes [1] to read about the new features, or 
download [2] the release today. 

[1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html 
[2] http://spark.eu.apache.org/downloads.html 

NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE FOR 
SEVERAL HOURS. 

Please e-mail me directly for any type-o's in the release notes or name 
listing. 

Thanks, and congratulations! 
- Patrick 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



RE: Announcing Spark 1.1.0!

2014-09-11 Thread Haopu Wang
I see the binary packages include hadoop 1, 2.3 and 2.4.
Does Spark 1.1.0 support hadoop 2.5.0 at below address?

http://hadoop.apache.org/releases.html#11+August%2C+2014%3A+Release+2.5.0+available

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Friday, September 12, 2014 8:13 AM
To: d...@spark.apache.org; user@spark.apache.org
Subject: Announcing Spark 1.1.0!

I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is
the second release on the API-compatible 1.X line. It is Spark's
largest release ever, with contributions from 171 developers!

This release brings operational and performance improvements in Spark
core including a new implementation of the Spark shuffle designed for
very large scale workloads. Spark 1.1 adds significant extensions to
the newest Spark modules, MLlib and Spark SQL. Spark SQL introduces a
JDBC server, byte code generation for fast expression evaluation, a
public types API, JSON support, and other features and optimizations.
MLlib introduces a new statistics library along with several new
algorithms and optimizations. Spark 1.1 also builds out Spark's Python
support and adds new components to the Spark Streaming module.

Visit the release notes [1] to read about the new features, or
download [2] the release today.

[1] http://spark.eu.apache.org/releases/spark-release-1-1-0.html
[2] http://spark.eu.apache.org/downloads.html

NOTE: SOME ASF DOWNLOAD MIRRORS WILL NOT CONTAIN THE RELEASE FOR SEVERAL HOURS.

Please e-mail me directly for any type-o's in the release notes or name listing.

Thanks, and congratulations!
- Patrick

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Haopu Wang
Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
"Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses."

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-----Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl"

I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl", which 
component is doing the cleanup? And what's the difference if I set 
"spark.cleaner.ttl" to some duration in this case?

[answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



"spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Haopu Wang
I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory
usage seems to be stable.

[question] In this case, because input RDDs are persisted but they don't
fit into memory, so write to disk, right? And where can I see the
details about these RDDs? I don't see them in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of
"spark.local.dir" folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl",
which component is doing the cleanup? And what's the difference if I set
"spark.cleaner.ttl" to some duration in this case?

Thank you!



number of "Cached Partitions" v.s. "Total Partitions"

2014-07-22 Thread Haopu Wang
Hi, I'm using local mode and read a text file as RDD using
JavaSparkContext.textFile() API.

And then call "cache()" method on the result RDD.

 

I look at the Storage information and find the RDD has 3 partitions but
2 of them have been cached.

Is this a normal behavior? I assume all of partitions should be cached
or none of them.

If I'm wrong, what are the cases when number of "cached" partitions is
less than the total number of partitions?

 

 



RE: data locality

2014-07-21 Thread Haopu Wang
Sandy,

 

I just tried the standalone cluster and didn't have chance to try Yarn yet.

So if I understand correctly, there are *no* special handling of task 
assignment according to the HDFS block's location when Spark is running as a 
*standalone* cluster.

Please correct me if I'm wrong. Thank you for your patience!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: 2014年7月22日 9:47
To: user@spark.apache.org
Subject: Re: data locality

 

This currently only works for YARN.  The standalone default is to place an 
executor on every node for every job.

 

The total number of executors is specified by the user.

 

-Sandy

 

On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang  wrote:

Sandy,

 

Do you mean the “preferred location” is working for standalone cluster also? 
Because I check the code of SparkContext and see comments as below:

 

  // This is used only by YARN for now, but should be relevant to other cluster 
types (Mesos,

  // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It

  // contains a map from hostname to a list of input format splits on the host.

  private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = 
Map()

 

BTW, even with the preferred hosts, how does Spark decide how many total 
executors to use for this application?

 

Thanks again!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: Friday, July 18, 2014 3:44 PM
To: user@spark.apache.org
Subject: Re: data locality

 

Hi Haopu,

 

Spark will ask HDFS for file block locations and try to assign tasks based on 
these.

 

There is a snag.  Spark schedules its tasks inside of "executor" processes that 
stick around for the lifetime of a Spark application.  Spark requests executors 
before it runs any jobs, i.e. before it has any information about where the 
input data for the jobs is located.  If the executors occupy significantly 
fewer nodes than exist in the cluster, it can be difficult for Spark to achieve 
data locality.  The workaround for this is an API that allows passing in a set 
of preferred locations when instantiating a Spark context.  This API is 
currently broken in Spark 1.0, and will likely changed to be something a little 
simpler in a future release.

 

val locData = InputFormatInfo.computePreferredLocations

  (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new 
Path(“myfile.txt”)))

 

val sc = new SparkContext(conf, locData)

 

-Sandy

 

 

On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang  wrote:

I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 

 

 



concurrent jobs

2014-07-18 Thread Haopu Wang
By looking at the code of JobScheduler, I find a parameter of below:

 

  private val numConcurrentJobs = 
ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

  private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)

 

Does that mean each App can have only one active stage?

 

In my psydo-code below:

 

 S1 = viewDStream.forEach( collect() )..

 S2 = viewDStream.forEach( collect() )..

 

There should be two “collect()” jobs for each batch interval, right? Are they 
running in parallel?

 

Thank you!



RE: data locality

2014-07-18 Thread Haopu Wang
Sandy,

 

Do you mean the “preferred location” is working for standalone cluster also? 
Because I check the code of SparkContext and see comments as below:

 

  // This is used only by YARN for now, but should be relevant to other cluster 
types (Mesos,

  // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It

  // contains a map from hostname to a list of input format splits on the host.

  private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = 
Map()

 

BTW, even with the preferred hosts, how does Spark decide how many total 
executors to use for this application?

 

Thanks again!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: Friday, July 18, 2014 3:44 PM
To: user@spark.apache.org
Subject: Re: data locality

 

Hi Haopu,

 

Spark will ask HDFS for file block locations and try to assign tasks based on 
these.

 

There is a snag.  Spark schedules its tasks inside of "executor" processes that 
stick around for the lifetime of a Spark application.  Spark requests executors 
before it runs any jobs, i.e. before it has any information about where the 
input data for the jobs is located.  If the executors occupy significantly 
fewer nodes than exist in the cluster, it can be difficult for Spark to achieve 
data locality.  The workaround for this is an API that allows passing in a set 
of preferred locations when instantiating a Spark context.  This API is 
currently broken in Spark 1.0, and will likely changed to be something a little 
simpler in a future release.

 

val locData = InputFormatInfo.computePreferredLocations

  (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new 
Path(“myfile.txt”)))

 

val sc = new SparkContext(conf, locData)

 

-Sandy

 

 

On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang  wrote:

I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 

 



data locality

2014-07-18 Thread Haopu Wang
I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 



RE: All of the tasks have been completed but the Stage is still shown as "Active"?

2014-07-11 Thread Haopu Wang
I saw some exceptions like this in driver log. Can you shed some lights? Is it 
related with the behaviour?

 

14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener threw an 
exception

java.util.NoSuchElementException: key not found: 64019

 at scala.collection.MapLike$class.default(MapLike.scala:228)

 at scala.collection.AbstractMap.default(Map.scala:58)

 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

 at 
org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)

 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at scala.Option.foreach(Option.scala:236)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

 



From: Haopu Wang 
Sent: Thursday, July 10, 2014 7:38 PM
To: user@spark.apache.org
Subject: RE: All of the tasks have been completed but the Stage is still shown 
as "Active"?

 

I didn't keep the driver's log. It's a lesson.

I will try to run it again to see if it happens again.

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年7月10日 17:29
To: user@spark.apache.org
Subject: Re: All of the tasks have been completed but the Stage is still shown 
as "Active"?

 

Do you see any errors in the logs of the driver?

 

On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang  wrote:

I'm running an App for hours in a standalone cluster. From the data
injector and "Streaming" tab of web ui, it's running well.

However, I see quite a lot of Active stages in web ui even some of them
have all of their tasks completed.

I attach a screenshot for your reference.

Do you ever see this kind of behavior?

 



RE: All of the tasks have been completed but the Stage is still shown as "Active"?

2014-07-10 Thread Haopu Wang
I didn't keep the driver's log. It's a lesson.

I will try to run it again to see if it happens again.

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年7月10日 17:29
To: user@spark.apache.org
Subject: Re: All of the tasks have been completed but the Stage is still shown 
as "Active"?

 

Do you see any errors in the logs of the driver?

 

On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang  wrote:

I'm running an App for hours in a standalone cluster. From the data
injector and "Streaming" tab of web ui, it's running well.

However, I see quite a lot of Active stages in web ui even some of them
have all of their tasks completed.

I attach a screenshot for your reference.

Do you ever see this kind of behavior?

 



How to clear the list of Completed Appliations in Spark web UI?

2014-07-09 Thread Haopu Wang
Besides restarting the Master, is there any other way to clear the
Completed Applications in Master web UI?