Sparse vs. Dense vector memory usage

2021-08-02 Thread Gerard Maas
Dear Spark folks,

Is there somewhere a guideline on the density tipping point when it makes
more sense to use a spark ml dense vector vs. a sparse vector with regards
to the memory usage on fairly large (image processing) vectors?
My google-foo didn't deliver me anything useful.

Thanks in advance!

Gerard.


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Gerard Maas
Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>


Re: Spark Streaming not working

2020-04-14 Thread Gerard Maas
Hi,

Could you share the code that you're using to configure the connection to
the Kafka broker?

This is a bread-and-butter feature. My first thought is that there's
something in your particular setup that prevents this from working.

kind regards, Gerard.

On Fri, Apr 10, 2020 at 7:34 PM Debabrata Ghosh 
wrote:

> Hi,
> I have a spark streaming application where Kafka is producing
> records but unfortunately spark streaming isn't able to consume those.
>
> I am hitting the following error:
>
> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>
>
> Would you please be able to help with a resolution.
>
> Thanks,
> Debu
>


Re: [StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Gerard Maas
Ooops - linked the wrong JIRA ticket:  (that other one is related)

https://issues.apache.org/jira/browse/SPARK-28025

On Wed, Jun 12, 2019 at 1:21 PM Gerard Maas  wrote:

> Hi!
> I would like to socialize this issue we are currently facing:
> The Structured Streaming default CheckpointFileManager leaks .crc files by
> leaving them behind after users of this class (like
> HDFSBackedStateStoreProvider) apply their cleanup methods.
>
> This results in an unbounded creation of tiny files that eat away storage
> by the block and, in our case, deteriorates the file system performance.
>
> We correlated the processedRowsPerSecond reported by the
> StreamingQueryProgress against a count of the .crc files in the storage
> directory (checkpoint + state store). The performance impact we observe is
> dramatic.
>
> We are running on Kubernetes, using GlusterFS as the shared storage
> provider.
> [image: out processedRowsPerSecond vs. files in storage_process.png]
> I have created a JIRA ticket with additional detail:
>
> https://issues.apache.org/jira/browse/SPARK-17475
>
> This is also related to an earlier discussion about the state store
> unbounded disk-size growth, which was left unresolved back then:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-State-Store-storage-behavior-for-the-Stream-Deduplication-function-td34883.html
>
> If there's any additional detail I should add/research, please let me know.
>
> kind regards, Gerard.
>
>
>


[StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Gerard Maas
Hi!
I would like to socialize this issue we are currently facing:
The Structured Streaming default CheckpointFileManager leaks .crc files by
leaving them behind after users of this class (like
HDFSBackedStateStoreProvider) apply their cleanup methods.

This results in an unbounded creation of tiny files that eat away storage
by the block and, in our case, deteriorates the file system performance.

We correlated the processedRowsPerSecond reported by the
StreamingQueryProgress against a count of the .crc files in the storage
directory (checkpoint + state store). The performance impact we observe is
dramatic.

We are running on Kubernetes, using GlusterFS as the shared storage
provider.
[image: out processedRowsPerSecond vs. files in storage_process.png]
I have created a JIRA ticket with additional detail:

https://issues.apache.org/jira/browse/SPARK-17475

This is also related to an earlier discussion about the state store
unbounded disk-size growth, which was left unresolved back then:
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-State-Store-storage-behavior-for-the-Stream-Deduplication-function-td34883.html

If there's any additional detail I should add/research, please let me know.

kind regards, Gerard.


Re: The following Java MR code works for small dataset but throws(arrayindexoutofBound) error for large dataset

2019-05-09 Thread Gerard Maas
Hi,

I'm afraid you sent this email to the wrong Mailing list.
This is the Spark users mailing list. We could probably tell you how to do
this with Spark, but I think that's not your intention :)

kr, Gerard.


On Thu, May 9, 2019 at 11:03 AM Balakumar iyer S 
wrote:

> Hi All,
>
> I am trying to read a orc file and  perform groupBy operation on it , but
> When i run it on a large data set we are facing the following error
> message.
>
> Input format of INPUT DATA
>
> |178111256|  107125374|
> |178111256|  107148618|
> |178111256|  107175361|
> |178111256|  107189910|
>
> and we are try to group by the first column.
>
> But as per the logic and syntax the code is appropriate but it is  working
> well on small data set. I have attached the code in the text file.
>
> Thank you for your time.
>
> ERROR MESSAGE:
> Error: java.lang.ArrayIndexOutOfBoundsException at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1453)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write(MapTask.java:1349)
> at java.io.DataOutputStream.writeByte(DataOutputStream.java:153) at
> org.apache.hadoop.io.WritableUtils.writeVLong(WritableUtils.java:273) at
> org.apache.hadoop.io.WritableUtils.writeVInt(WritableUtils.java:253) at
> org.apache.hadoop.io.Text.write(Text.java:330) at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
> at
> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1149)
> at
> org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:610)
> at orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:73) at
> orc_groupby.orc_groupby.Orc_groupBy$MyMapper.map(Orc_groupBy.java:39) at
> org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at
> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:422) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
>
>
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-03 Thread Gerard Maas
James,

How do you create an instance of `RDD[Iterable[MyCaseClass]]` ?
Is it in that first code snippet? > new SparkContext(sc).parallelize(seq)?

kr, Gerard




On Fri, Nov 30, 2018 at 3:02 PM James Starks 
wrote:

> When processing data, I create an instance of RDD[Iterable[MyCaseClass]]
> and I want to convert it to RDD[MyCaseClass] so that it can be further
> converted to dataset or dataframe with toDS() function. But I encounter a
> problem that SparkContext can not be instantiated within SparkSession.map
> function because it already exists, even with allowMultipleContexts set to
> true.
>
> val sc = new SparkConf()
> sc.set("spark.driver.allowMultipleContexts", "true")
> new SparkContext(sc).parallelize(seq)
>
> How can I fix this?
>
> Thanks.
>


Re: Re: About the question of Spark Structured Streaming window output

2018-08-27 Thread Gerard Maas
Hi,

> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"?
When I define the window, the starttime is not set.
When no 'starttime' is defined, windows are aligned to the start of the
upper time magnitude. So, if your window is defined in minutes, it will be
aligned to the start of the hour and the first window will be the current
window.
In the case of a window of 5:00 minutes, it will be 9:00 - 9:05, 9:05-9:10,
9:10-9:15, ... 9:45-9:50, 9:50-9:55, ...
The first data point sets the internal 'event time clock' and the first
corresponding window for 9:53 is 9:50-9:55

Also, note that 'start time' is a very misleading name for that window
parameter. It's actually `starttimeoffset`. If you would specify
'startTime' to `1 minute`, then the windows will be 9:01-9:06, 9:06-9:11,
...

2、why the agg result of time "2018-08-27 09:53:00 " is not output when the
batch1 data is coming?
Yes. the resulting value is clearly there:
2018-08-27 09:50:00|2018-08-27 09:55:00|   2|
the two datapoints that fall in this window are summed and the result is 2.

What would were you expecting?

kr, Gerard.


On Mon, Aug 27, 2018 at 5:37 AM z...@zjdex.com  wrote:

> Hi Jungtaek Lim & Gerard Mass:
> Thanks very much.
> When I put three batch data like following :
>
> batch 0:
> 2018-08-27 09:53:00,1
> 2018-08-27 09:53:01,1
>
> batch 1:
> 2018-08-27 11:04:00,1
> 2018-08-27 11:04:01,1
>
> batch 2:
> 2018-08-27 11:17:00,1
> 2018-08-27 11:17:01,1
>
> the agg result of time "2018-08-27 09:53:00" is output like following:
> Batch: 2
> ---
> +---+---++
> |  start|end|sumvalue|
> +---+---++
> |2018-08-27 09:50:00|2018-08-27 09:55:00|   2|
> +---+---++
>
> For the result, I wonder to know:
> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"?
> When I define the window, the starttime is not set.
> 2、why the agg result of time "2018-08-27 09:53:00 " is not output when
> the batch1 data is comming?
>
> Thanks a lot!
>
>
>
> --
> z...@zjdex.com
>
>
> *From:* Jungtaek Lim 
> *Date:* 2018-08-27 11:01
> *To:* z...@zjdex.com
> *CC:* Gerard Maas ; user ;
> 王程浩 
> *Subject:* Re: Re: About the question of Spark Structured Streaming
> window output
> You may want to add streaming listener to your query and see when/how
> watermark is updated. In short, watermark is calculated from previous batch
> and calculated value is applied to current batch. So you may think that the
> result is provided later than expected, maybe a batch.
>
> 2018년 8월 27일 (월) 오전 11:56, z...@zjdex.com 님이 작성:
>
>> Hi Gerard Mass:
>> Thanks a lot for your reply.
>> When i use "append" model,  I send the following data:
>> 2018-08-27 09:53:00,1
>> 2018-08-27 09:53:01,1
>> The result (which has only schema, like the following) has received after
>> the batch is end. But when the time of window + watermark is up, there
>> is no result to output. Is there something I misss? Thanks in advance.
>>
>>
>>
>> --
>> z...@zjdex.com
>>
>>
>> *From:* Gerard Maas 
>> *Date:* 2018-08-27 05:00
>> *To:* zrc 
>> *CC:* spark users ; wch 
>> *Subject:* Re: About the question of Spark Structured Streaming window
>> output
>>
>> Hi,
>>
>> When you use a window in Append mode, you need to wait for the end of the
>> window + watermark to see the final record from the "append" mode.
>> This is your query over time. Note the timestamp at the right side of the
>> cell and the data present in it.
>>
>> val windowedCounts = dataSrc
>>   .withWatermark("timestamp", "1 minutes")
>>   .groupBy(window($"timestamp", "5 minutes"))
>>   .agg(sum("value") as "sumvalue")
>>   .select("window.start", "window.end","sumvalue")
>>
>>
>> [image: image.png]
>>
>> Going back to your questions:
>> 1、when I set the append output model,  I send inputdata, but there is no
>> result to output. How to use append model in window aggreate case ?
>> Wait for the window + watermark to expire and you'll see the append
>> record output
>>
>> 2、when I set the update output model, I send inputdata, the result is
>> output every batch .But I want output the result only once when window is
>> end. How can I do?
>>

Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

2018-08-14 Thread Gerard Maas
Hi Aakash,

In Spark Streaming, forEachRDD provides you access to the data in
each micro batch.
You can transform that RDD into a DataFrame and implement the flow you
describe.

eg.:

var historyRDD:RDD[mytype] = sparkContext.emptyRDD

// create Kafka Dstream ...

dstream.foreachRDD{ rdd =>
  val allData = historyRDD union rdd
  val df = allData.toDF   // requires the RDD to be of some structured
type. i.e. a case class
  // do something with the dataframe df
historyRDD = allData  // this needs checkpointing
}
Depending on the volume of data you're dealing with, it might not be
possible to hold all data in memory.
Checkpoint of the historyRDD is mandatory to break up the growing lineage
(union will keep a reference to the previous RDDs and at some point, things
will blow up)
So, while this trick might keep data within the Spark boundaries, you still
need resilient storage to write the checkpoints in order to implement a
reliable streaming job.

As you are using Kafka, another alternative would be to write the
transformed data to Kafka and have the training job consume that topic,
replaying data from the start.
Confluent has some good resources on how to use "kafka as a storage"

I  hope this helps.

kr, Gerard.

PS: I'm also not sure why you are initially writing the files to Kafka. It
would be easier to read the files directly from Spark Streaming or
Structured Streaming.





On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu 
wrote:

> Hi all,
>
> The requirement is, to process file using Spark Streaming fed from Kafka
> Topic and once all the transformations are done, make it a batch of static
> dataframe and pass it into a Spark ML Model tuning.
>
> As of now, I had been doing it in the below fashion -
>
> 1) Read the file using Kafka
> 2) Consume it in Spark using a streaming dataframe
> 3) Run spark transformation jobs on streaming data
> 4) Append and write on HDFS.
> 5) Read the transformed file as batch in Spark
> 6) Run Spark ML Model
>
> But, the requirement is to avoid use of HDFS as it may not be installed in
> certain clusters, so, we've to avoid the disk I/O and do it on the fly from
> Kafka to append in a spark static DF and hence pass that DF to the ML Model.
>
> How to go about it?
>
> Thanks,
> Aakash.
>


Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

2018-07-19 Thread Gerard Maas
Hi Chris,

Could you show the code you are using? When you mention "I like to use a
static datasource (JDBC) in the state function" do you refer to a DataFrame
from a JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must
be serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is
made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are
defined. They are bound to the Spark Session in the driver and it does not
make sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a
NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (d...@spark.apache.org) is for discussions about open source
development of the Spark project.
For general questions like this, use the user's  mailing list (
user@spark.apache.org)  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <
christiaan@semmelwise.nl> wrote:

> I use the state function flatmapgroupswithstate to track state of a kafka
> stream. To further customize the state function I like to use a static
> datasource (JDBC) in the state function. This datasource contains data I
> like to join with the stream (as Iterator) within flatmapgroupswithstate.
>
>
>
> When I try to access the JDBC source within flatmapgroupswithstate Spark
> execution freezes without any Exceptions or logging.
>
> To verify the JDBC connection works, I also tried to access the source
> outside the state function and that works. So now I join the static source
> with streaming source before feeding it to flatmapgroupswithstate. It seems
> to work so far…
>
>
>
> Any ideas why accessing the JDBC source within flatmapgroupswithstate
> could fail (freezes Spark execution)? Is it wise to use external
> datasources within flatmapgroupswithstate?
>
>
>
> Thanks,
>
> Chris
>
>
>
>
>


Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Gerard Maas
Hi,

In Structured Streaming lingo, "ReduceByKeyAndWindow" would be a window
aggregation with a composite key.
Something like:
stream.groupBy($"key", window($"timestamp", "5 minutes"))
   .agg(sum($"value") as "total")

The aggregate could be any supported SQL function.
Is this what you are looking for? Otherwise, share your specific use case
to see how it could be implemented in Structured Streaming.

kr, Gerard.

On Thu, Jun 28, 2018 at 10:21 AM oripwk  wrote:

> In Structured Streaming, there's the notion of event-time windowing:
>
>
>
> However, this is not quite similar to DStream's windowing operations: in
> Structured Streaming, windowing groups the data by fixed time-windows, and
> every event in a time window is associated to its group:
>
>
> And in DStreams it just outputs all the data according to a limited window
> in time (last 10 minutes for example).
>
> The question was asked also  here
> <
> https://stackoverflow.com/questions/49821646/is-there-someway-to-do-the-eqivalent-of-reducebykeyandwindow-in-spark-structured>
>
> , if it makes it clearer.
>
> How the latter can be achieved in Structured Streaming?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Streaming] Measure latency

2018-06-26 Thread Gerard Maas
Hi Daniele,

A pragmatic approach to do that would be to execute the computations in the
scope of a foreachRDD, surrounded by wall-clock timers.
For example:
dstream.foreachRDD{ rdd =>
   val t0 = System.currentTimeMillis()
   val aggregates = rdd.
   // make sure you get a result here, not another RDD.
   // Otherwise you need to do something like rdd.count to materialize it
   val elapsedTime = System.currentTimeMillis() - t0
   println(s"operation took $elapsedTime")
}

In the end, this will result in the same performance as the batch spark
engine, so you might want to check the performance there first.
If you want to add the stream ingestion time to this, it gets a bit more
tricky.

kind regards, Gerard.



On Tue, Jun 26, 2018 at 11:49 AM Daniele Foroni 
wrote:

> Hi all,
>
> I am using spark streaming and I need to evaluate the latency of the
> standard aggregations (avg, min, max, …) provided by the spark APIs.
> Any way to do it in the code?
>
> Thanks in advance,
> ---
> Daniele
>
>


Re: Advice on multiple streaming job

2018-05-07 Thread Gerard Maas
Dhaval,

Which Streaming API are you using?
In Structured Streaming, you are able to start several streaming queries
within the same context.

kind regards, Gerard.

On Sun, May 6, 2018 at 7:59 PM, Dhaval Modi  wrote:

> Hi Susan,
>
> Thanks for your response.
>
> Will try configuration as suggested.
>
> But still i am looking for answer does Spark support running multiple jobs
> on the same port?
>
> On Sun, May 6, 2018, 20:27 Susan X. Huynh  wrote:
>
>> Hi Dhaval,
>>
>> Not sure if you have considered this: the port 4040 sounds like a driver
>> UI port. By default it will try up to 4056, but you can increase that
>> number with "spark.port.maxRetries". (https://spark.apache.org/
>> docs/latest/configuration.html) Try setting it to "32". This would help
>> if the only conflict is among the driver UI ports (like if you have > 16
>> drivers running on the same host).
>>
>> Susan
>>
>> On Sun, May 6, 2018 at 12:32 AM, vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Use a scheduler that abstract the network away with a CNI for instance
>>> or other mécanismes (mesos, kubernetes, yarn). The CNI will allow to always
>>> bind on the same ports because each container will have its own IP. Some
>>> other solution like mesos and marathon can work without CNI , with host IP
>>> binding, but will manage the ports for you ensuring there isn't any
>>> conflict.
>>>
>>> Le sam. 5 mai 2018 à 17:10, Dhaval Modi  a
>>> écrit :
>>>
 Hi All,

 Need advice on executing multiple streaming jobs.

 Problem:- We have 100's of streaming job. Every streaming job uses new
 port. Also, Spark automatically checks port from 4040 to 4056, post that it
 fails. One of the workaround, is to provide port explicitly.

 Is there a way to tackle this situation? or Am I missing any thing?

 Thanking you in advance.

 Regards,
 Dhaval Modi
 dhavalmod...@gmail.com

>>>
>>
>>
>> --
>> Susan X. Huynh
>> Software engineer, Data Agility
>> xhu...@mesosphere.com
>>
>


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Gerard Maas
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query
blocks and the code for the second does not get executed. Panagiotis
pointed this out correctly.
In the updated code, the issue is related to netcat (nc) and the way
structured streaming works. As far as I remember, netcat only delivers data
to the first network connection.
On the structured streaming side, each query will issue its own connection.
This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients,
you would see data in both queries.

If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.



On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu 
wrote:

> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
> Thanks,
> Aakash.
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Note that what you are trying to do here is join a streaming data frame
>> with an aggregated streaming data frame. As per the documentation, joining
>> an aggregated streaming data frame with another streaming data frame is not
>> supported
>>
>>
>>
>>
>>
>> *From: *spark receiver 
>> *Date: *Friday, April 13, 2018 at 11:49 PM
>> *To: *Aakash Basu 
>> *Cc: *Panagiotis Garefalakis , user <
>> user@spark.apache.org>
>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>
>>
>>
>> Hi Panagiotis ,
>>
>>
>>
>> Wondering you solved the problem or not? Coz I met the same issue today.
>> I’d appreciate  so much if you could paste the code snippet  if it’s
>> working .
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> 在 2018年4月6日,上午7:40,Aakash Basu  写道:
>>
>>
>>
>> Hi Panagiotis,
>>
>> I did that, but it still prints the result of the first query and awaits
>> for new data, doesn't even goes to the next one.
>>
>> *Data -*
>>
>> $ nc -lk 9998
>>
>> 1,2
>> 3,4
>> 5,6
>> 7,8
>>
>> *Result -*
>>
>> ---
>> Batch: 0
>> ---
>> ++
>> |aver|
>> ++
>> | 3.0|
>> ++
>>
>> ---
>> Batch: 1
>> ---
>> ++
>> |aver|
>> ++
>> | 4.0|
>> ++
>>
>>
>>
>> *Updated Code -*
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split
>>
>> spark = SparkSession \
>> .builder \
>> .appName("StructuredNetworkWordCount") \
>> .getOrCreate()
>>
>> data = spark \
>> .readStream \
>> .format("socket") \
>> .option("header","true") \
>> .option("host", "localhost") \
>> .option("port", 9998) \
>> .load("csv")
>>
>>
>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>> split(data.value, ",").getItem(1).alias("col2"))
>>
>> id_DF.createOrReplaceTempView("ds")
>>
>> df = spark.sql("select avg(col1) as aver from ds")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
>> from ds")  # (select aver from abcd)
>>
>> query2 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> query = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> spark.streams.awaitAnyTermination()
>>
>>
>>
>> Thanks,
>>
>> Aakash.
>>
>>
>>
>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>> panga...@gmail.com> wrote:
>>
>> Hello Aakash,
>>
>>
>>
>> When you use query.awaitTermination you are pretty much blocking there
>> waiting for the current query to stop or throw an exception. In your case
>> the second query will not even start.
>>
>> What you could do instead is remove all the blocking calls and use
>> spark.streams.awaitAnyTermination instead (waiting for either query1 or
>> query2 to terminate). Make sure you do that after the query2.start call.
>>
>>
>>
>> I hope this helps.
>>
>>
>>
>> Cheers,
>>
>> Panagiotis
>>
>>
>>
>> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
>> wrote:
>>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>>
>>
>> -- Forwarded message --
>> From: *Aakash Basu* 
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user 
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>>
>>
>> from pyspark.sql import 

[Structured Streaming] File source, Parquet format: use of the mergeSchema option.

2018-04-12 Thread Gerard Maas
Hi,

I'm looking into the Parquet format support for the File source in
Structured Streaming.
The docs mention the use of the option 'mergeSchema' to merge the schemas
of the part files found.[1]

What would be the practical use of that in a streaming context?

In its batch counterpart, `mergeSchemas` would infer the schema superset of
the part-files found.


When using the File source + parquet format in streaming mode, we must
provide a schema to the readStream.schema(...) builder and that schema is
fixed for the duration of the stream.

My current understanding is that:

- Files containing a subset of the fields declared in the schema will
render null values for the non-existing fields.
- For files containing a superset of the fields, the additional data fields
will be lost.
- Files not matching the schema set on the streaming source, will render
all fields null for each record in the file.

Is the 'mergeSchema' option playing another role? From the user
perspective, they may think that this option would help their job cope with
schema evolution at runtime, but that does not seem to be the case.

What is the use of this option?

-kr, Gerard.


[1] https://github.com/apache/spark/blob/master/sql/core/src
/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L376


Re: Scala - Spark for beginners

2018-03-18 Thread Gerard Maas
This is a good start:
https://github.com/deanwampler/JustEnoughScalaForSpark

And the corresponding talk:
https://www.youtube.com/watch?v=LBoSgiLV_NQ

There're many more resources if you search for it.

-kr, Gerard.

On Sun, Mar 18, 2018 at 11:15 AM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> Can any one share with me nice tutorials on Spark with Scala like
> videos, blogs for beginners. Mostly focusing on writing scala code.
>
> Thanks in advance.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark StreamingContext Question

2018-03-07 Thread Gerard Maas
Hi,

You can run as many jobs in your cluster as you want, provided you have
enough capacity.
The one streaming context constrain is per job.

You can submit several jobs for Flume and some other for Twitter, Kafka,
etc...

If you are getting started with Streaming with Spark, I'd recommend you to
look into Structured Streaming first.
In Structured Streaming, you can have many streaming queries running under
the same spark session.
Yet, that does not mean you need to put them all in the same job. You can
(and should) still submit different jobs for different application concerns.

kind regards, Gerard.



On Wed, Mar 7, 2018 at 4:56 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi all,
>
> Understand from documentation that, only one streaming context can be
> active in a JVM at the same time.
>
> Hence in an enterprise cluster, how can we manage/handle multiple users
> are having many different streaming applications, one may be ingesting data
> from Flume, another from Twitter etc? Is this not available now?
>
> Best,
> Passion
>


Re: can HDFS be a streaming source like Kafka in Spark 2.2.0?

2018-01-15 Thread Gerard Maas
Hi,

You can monitor a filesystem directory as streaming source as long as the
files placed there are atomically copied/moved into the directory.
Updating the files is not supported.

kr, Gerard.

On Mon, Jan 15, 2018 at 11:41 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering if HDFS can be a streaming source like Kafka in Spark
> 2.2.0? For example can I have stream1 reading from Kafka and writing to
> HDFS and stream2 to read from HDFS and write it back to Kakfa ? such that
> stream2 will be pulling the latest updates written by stream1.
>
> Thanks!
>


Re: Spark Streaming with Confluent

2017-12-13 Thread Gerard Maas
Hi Arkadiusz,

Try 'rooting' your import. It looks like the import is being interpreted as
being relative to the previous.
'rooting; is done by adding the  '_root_'  prefix to your import:

import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.io.confluent.kafka.serializers.KafkaAvroDecoder

kr, Gerard.

On Wed, Dec 13, 2017 at 6:05 PM, Arkadiusz Bicz 
wrote:

> Hi,
>
> I try to test spark streaming 2.2.0 version with confluent 3.3.0
>
> I have got lot of error during compilation this is my sbt:
>
> lazy val sparkstreaming = (project in file("."))
>   .settings(
>   name := "sparkstreaming",
>   organization := "org.arek",
>   version := "0.1-SNAPSHOT",
>   scalaVersion := "2.11.8",
> libraryDependencies ++=  Seq(
>   "org.apache.spark" %% "spark-streaming" % "2.2.0",
>   "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
>   "io.confluent" % "kafka-avro-serializer" % "3.3.0"
> )
>   )
>
>
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> import io.confluent.kafka.serializers.KafkaAvroDecoder
>
> object Transformation extends Serializable {
>
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("StreamingTranformation").
> setMaster("local[*]")
> val streamingContext = new StreamingContext(conf, Seconds(1))
>
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "local:2181",
>   "schema.registry.url" -> "http://local:8081;,
>   "auto.offset.reset" -> "smallest")
>
> val topicSet = Set("GEXPPROD_ROUTE")
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
>
> val lines = messages.foreachRDD(rdd => {
>   rdd.foreach({ avroRecord =>
> println(avroRecord)
>   })
> })
>   }
>
>
> [warn] Found version conflict(s) in library dependencies; some are
> suspected to be binary incompatible:
> [warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
> 3.7.0.Final}
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.8
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.6
>  (depends on 3.6.2.Final)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 3.6.2.Final)
> [warn]  * commons-net:commons-net:2.2 is selected over 3.1
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.1)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 3.1)
> [warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
> [warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.curator:curator-framework:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-client:2.6.0
> (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-recipes:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.htrace:htrace-core:3.0.4   (depends
> on 12.0.1)
> [warn] Run 'evicted' to see detailed eviction warnings
> [info] Compiling 1 Scala source to /home/adminuser/data-
> streaming-platform/sparkstreaming/target/scala-2.11/classes ...
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
> object confluent is not a member of package org.apache.spark.io
> [error] import io.confluent.kafka.serializers.KafkaAvroDecoder
> [error]   ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]  ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]
>
>
> When changing to library  "org.apache.spark" %%
> "spark-streaming-kafka-0-10" % "2.2.0" :
>
>
> [warn] 

Re: Union of RDDs Hung

2017-12-12 Thread Gerard Maas
Can you show us the code?

On Tue, Dec 12, 2017 at 9:02 AM, Vikash Pareek 
wrote:

> Hi All,
>
> I am unioning 2 rdds(each of them having 2 records) but this union it is
> getting hang.
> I found a solution to this that is caching both the rdds before performing
> union but I could not figure out the root cause of hanging the job.
>
> Is somebody knows why this happens with union?
>
> Spark version I am using is 1.6.1
>
>
> Best Regards,
> Vikash Pareek
>
>
>
> -
>
> __Vikash Pareek
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread Gerard Maas
Hi Kant,

>  but would your answer on .collect() change depending on running the
spark app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali <kanth...@gmail.com> wrote:

> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
> Thanks!
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas <gerard.m...@gmail.com> wrote:
>
>> The general answer to your initial question is that "it depends". If the
>> operation in the rdd.foreach() closure can be parallelized, then you don't
>> need to collect first. If it needs some local context (e.g. a socket
>> connection), then you need to do rdd.collect first to bring the data
>> locally, which has a perf penalty and also is restricted to the memory size
>> to the driver process.
>>
>> Given the further clarification:
>> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>
>> If it's writing to Kafka, that operation can be done in a distributed
>> form.
>>
>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>
>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>> migrate to structured streaming by already adopting the 'structured' APIs
>> within Spark Streaming:
>>
>> case class KV(key: String, value: String)
>>
>> dstream.map().reduce().forEachRdd{rdd ->
>> import spark.implicits._
>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
>> be in a (key,value) shape
>> val dataFrame = rdd.toDF()
>> dataFrame.write
>>  .format("kafka")
>>  .option("kafka.bootstrap.servers",
>> "host1:port1,host2:port2")
>>  .option("topic", "topic1")
>>  .save()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>
>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>> richard.q...@capitalone.com> wrote:
>>>
>>>> Where do you check the output result for both case?
>>>>
>>>> Sent from my iPhone
>>>>
>>>>
>>>> > On Dec 5, 2017, at 15:36, kant kodali <kanth...@gmail.com> wrote:
>>>> >
>>>> > Hi All,
>>>> >
>>>> > I have a simple stateless transformation using Dstreams (stuck with
>>>> the old API for one of the Application). The pseudo code is rough like this
>>>> >
>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute
>>>> fine but a bit slow
>>>> > })
>>>> >
>>>> > I understand collect collects the results back to the driver but is
>>>> that necessary? can I just do something like below? I believe I tried both
>>>> and somehow the below code didn't output any results (It can be issues with
>>>> my env. I am not entirely sure) but I just would like some clarification on
>>>> .collect() since it seems to slow things down for me.
>>>> >
>>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>>> >  rdd.forEach(() -> {} ); //
>>>> > })
>>>> >
>>>> > Thanks!
>>>> >
>>>> >
>>>> 
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>>
>>>
>>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Gerard Maas
The general answer to your initial question is that "it depends". If the
operation in the rdd.foreach() closure can be parallelized, then you don't
need to collect first. If it needs some local context (e.g. a socket
connection), then you need to do rdd.collect first to bring the data
locally, which has a perf penalty and also is restricted to the memory size
to the driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you can pave your way to
migrate to structured streaming by already adopting the 'structured' APIs
within Spark Streaming:

case class KV(key: String, value: String)

dstream.map().reduce().forEachRdd{rdd ->
import spark.implicits._
val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
be in a (key,value) shape
val dataFrame = rdd.toDF()
dataFrame.write
 .format("kafka")
 .option("kafka.bootstrap.servers",
"host1:port1,host2:port2")
 .option("topic", "topic1")
 .save()
}

-kr, Gerard.



On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:

> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard  > wrote:
>
>> Where do you check the output result for both case?
>>
>> Sent from my iPhone
>>
>>
>> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
>> >
>> > Hi All,
>> >
>> > I have a simple stateless transformation using Dstreams (stuck with the
>> old API for one of the Application). The pseudo code is rough like this
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute fine
>> but a bit slow
>> > })
>> >
>> > I understand collect collects the results back to the driver but is
>> that necessary? can I just do something like below? I believe I tried both
>> and somehow the below code didn't output any results (It can be issues with
>> my env. I am not entirely sure) but I just would like some clarification on
>> .collect() since it seems to slow things down for me.
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.forEach(() -> {} ); //
>> > })
>> >
>> > Thanks!
>> >
>> >
>> 
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>


Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Gerard Maas
Hi Arpan,

The error suggests that the streaming context has been started with
streamingContext.start() and after that statement, some other
dstream operations have been attempted.
A suggested pattern to manage the offsets is the following:

var offsetRanges: Array[OffsetRanger] = _

//create streaming context, streams, ...
// as first operation after the stream has been created, do:

stream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
//Then do other desired operations on the streaming data
val resultStream = stream.map(...).filter(...).transform(...)
//materialize the resulting stream

resultStream.foreachRDD{rdd =>
// do stuff... write to a db, to a kafka topic,... whatever,...

//at the end of the process, commit the offsets (note that I use the
original stream instance, not `resultStream`
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

I hope this helps,

kr, Gerard.











On Fri, Oct 13, 2017 at 3:34 PM, Arpan Rajani  wrote:

> Hi all,
>
> In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to
> store the offsets in Kafka in order to achieve restartability of the
> streaming application. ( Using checkpoints, I already implemented, we will
> require to change code in production hence checkpoint won't work)
>
> Checking Spark Streaming documentation- Storing offsets on Kafka approach
> :
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#kafka-itself, which describes :
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> Based on this, I modified the code like following:
>
> val kafkaMap:Map[String,Object] = KakfaConfigs
>
> val stream:InputDStream[ConsumerRecord[String,String]] = 
> KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] 
> (Array("topicName"),kafkaMap))
>
> stream.foreach { rdd =>
> val offsetRangers : Array[OffsetRanger] = 
> rdd.asInstanceOf[HasOffsetRangers].offsetRanges
>
> // Filter out the values which have empty values and get the tuple of type
> // ( topicname, stringValue_read_from_kafka_topic)
> stream.map(x => ("topicName",x.value)).filter(x=> 
> !x._2.trim.isEmpty).foreachRDD(processRDD _)
>
> // Sometime later, after outputs have completed.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> }
>
>
> def processRDD(rdd:RDD[(String,String)]) {
>  // Process futher to hdfs
> }
>
> Now, When I try to start Streaming application, it does not start and
> looking at the logs, here is what we see :
>
> java.lang.IllegalStateException: Adding new inputs, transformations, and 
> output operations after starting a context is not supported
> at 
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
>
>
> Can anyone suggest, or help to understand what are we missing here?
>
>
> Regards,
> Arpan
>


Re: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

2017-10-01 Thread Gerard Maas
Hammad,

The recommended way to implement this logic would be to:

Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession

Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations can process the complete
dataset. In this case, there's no need to do a perPartition or perElement
operation. (that would be the case if we were directly using the drivers
API and DB connections)

Reorganizing the code in the question a bit, we should have:

 SparkSession sparkSession = SparkSession
.builder()
.setMaster("local[2]").setAppName("TransformerStreamPOC")

.config("spark.some.config.option", "some-value")
.getOrCreate();

JavaStreamingContext jssc = new
JavaStreamingContext(sparkSession.sparkContext,
Durations.seconds(60));

// this dataset doesn't seem to depend on the received data, so we can
load it once.

Dataset baselineData =
sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name",
connectionProperties);

// create dstream

DStream dstream = ...

... operations on dstream...

dstream.foreachRDD { rdd =>

Dataset incomingData = sparkSession.createDataset(rdd)

   ... do something the incoming dataset, eg. join with the baseline ...

   DataFrame joined =  incomingData.join(baselineData, ...)

   ... do something with joined ...

  }


kr, Gerard.

On Sun, Oct 1, 2017 at 7:55 PM, Hammad  wrote:

> Hello,
>
> *Background:*
>
> I have Spark Streaming context;
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
> conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(60));
>
>
> that subscribes to certain kafka *topics*;
>
> JavaInputDStream> stream =
> KafkaUtils.createDirectStream(
> jssc,
> LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(*topics*, 
> kafkaParams)
> );
>
> when messages arrive in queue, I recursively process them as follows (below 
> code section will repeat in Question statement)
>
> stream.foreachRDD(rdd -> {
> //process here - below two scenarions code is inserted here
>
> });
>
>
> *Question starts here:*
>
> Since I need to apply SparkSQL to received events in Queue - I create 
> SparkSession with two scenarios;
>
> *1) Per partition one sparkSession (after 
> "spark.driver.allowMultipleContexts" set to true); so all events under this 
> partition are handled by same sparkSession*
>
> rdd.foreachPartition(partition -> {
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("Java Spark SQL basic example")
> .config("spark.some.config.option", "some-value")
> .getOrCreate();
>
> while (partition.hasNext()) {
>   Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
> *2) Per event under each session; so each event under each queue under each 
> stream has one sparkSession;*
>
> rdd.foreachPartition(partition -> {while (partition.hasNext()) {
> SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL 
> basic example").config("spark.some.config.option", 
> "some-value").getOrCreate();
>
> Dataset df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
> }}
>
>
> Is it good practice to create multiple contexts (lets say 10 or 100)?
> How does number of sparkContext to be allowed vs number of worker nodes
> relate?
> What are performance considerations with respect to scenario1 and
> scenario2?
>
> I am looking for these answers as I feel there is more to what I
> understand of performance w.r.t sparkContexts created by a streaming
> application.
> Really appreciate your support in anticipation.
>
> Hammad
>
>


Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-13 Thread Gerard Maas
Hi Shixiong,

Thanks for the explanation.

In my view, this is different from the intuitive understanding of the
Structured Streaming model [1], where incoming data is appended to an
'unbounded table' and queries are run on that. I had expected that all
queries would run on that 'unbounded table view', while I understand from
your explanation that every query maintains its own 'unbounded table' view
of the data stream. Is that correct?

How is that working in the case of Kafka? We have only one declared
consumer, so we should observe a similar behavior. Yet, the Kafka source is
able to deliver multiple output queries.
What is the difference?
Where could I learn more about the internal structured streaming model?

kind regards, Gerard.



[1]
https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts

On Sun, Aug 13, 2017 at 1:22 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Spark creates one connection for each query. The behavior you observed is
> because how "nc -lk" works. If you use `netstat` to check the tcp
> connections, you will see there are two connections when starting two
> queries. However, "nc" forwards the input to only one connection.
>
> On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz <rah...@gmail.com> wrote:
>
>> Hi Gerard, hi List,
>>
>> I think what this would entail is for Source.commit to change its
>> funcationality. You would need to track all streams' offsets there.
>> Especially in the socket source, you already have a cache (haven't looked
>> at Kafka's implementation to closely yet), so that shouldn't be the issue,
>> if at start-time all streams subscribed to a source are known.
>> What I worry about is, that this may need an API-change, to pass a
>> stream-ID into commit. Since different streams can use different Triggers,
>> you can have any number of unforeseeable results, when multiple threads
>> call commit.
>>
>> I'll look into that, since I am in the progress of building a
>> TwitterSource based on the socket source's general functionality, and due
>> to the API restrictions there, it's even more important for multiple
>> streams using one source.
>>
>> What I did observe was that every query did initialize a separate source.
>> This won't work so well with socket, since the socket is in use, once you
>> try to set up a second one. It also won't work so well with Twitter, since
>> usually an API key is limited in how often it can be used somultaneously
>> (likely at 2).
>>
>> An alternative to the socket source issue would be to open a new free
>> socket, but then the user has to figure out where the source is listening.
>>
>> I second Gerard's request for additional information, and confirmation of
>> my theories!
>>
>> Thanks,
>>
>> Rick
>>
>> On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I've been investigating this SO question: https://stackoverflo
>>> w.com/questions/45618489/executing-separate-streaming-querie
>>> s-in-spark-structured-streaming
>>>
>>> TL;DR: when using the Socket source, trying to create multiple queries
>>> does not work properly, only one the first query in the start order will
>>> receive data.
>>>
>>> This minimal example reproduces the issue:
>>>
>>> val lines = spark
>>> .readStream
>>> .format("socket")
>>> .option("host", "localhost")
>>> .option("port", "")
>>> .option("includeTimestamp", true)
>>> .load()
>>>
>>> val q1 = lines.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> val q2 = lines.withColumn("foo", lit("foo")).writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> Sample output (spark shell):
>>>
>>> Batch: 0
>>> ---
>>> +-+---+
>>> |value|  timestamp|
>>> +-+---+
>>> |  aaa|2017-08-11 23:37:59|
>>> +-+---+
>>>
>>> ---
>>> Batch: 1
>>> ---
>>> +-+---+
>>> |value|  timestamp|
>>> +-+---+
>>&g

[StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-11 Thread Gerard Maas
Hi,

I've been investigating this SO question:
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

TL;DR: when using the Socket source, trying to create multiple queries does
not work properly, only one the first query in the start order will
receive data.

This minimal example reproduces the issue:

val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "")
.option("includeTimestamp", true)
.load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .start()

Sample output (spark shell):

Batch: 0
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:37:59|
+-+---+

---
Batch: 1
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:38:00|
+-+---+

q1.stop

scala> ---
Batch: 0
---
+-+---+---+
|value|  timestamp|foo|
+-+---+---+
|b|2017-08-11 23:38:19|foo|
+-+---+---+

---
Batch: 1
---
+-+---+---+
|value|  timestamp|foo|
+-+---+---+
|b|2017-08-11 23:38:19|foo|
+-+---+---+

This is certainly unexpected behavior. Even though the socket source is
marked "not for production" I wouldn't expect to be so limited.

Am I right to think that the first running query consumes all the data in
the source, and therefore all the other queries do not work (until the
previous ones are stopped)?

Is this a generalized behavior? e.g. each query started on a structured
streaming job fully consumes the source? e.g. the Kafka source can be used
with multiple queries because it can be replayed?

As a workaround, would there be a way to cache the incoming data to
multiplex it? We cannot call `cache` a streaming dataset, but is there a
maybe way to do that?

Could I have more details on the execution model (I've consumed all I could
find) and what are the (near) future plans?

thanks!

-Gerard.


Re: Need Spark(Scala) Performance Tuning tips

2017-06-09 Thread Gerard Maas
also, read the newest book of Holden  on High-Performance Spark:

http://shop.oreilly.com/product/0636920046967.do

On Fri, Jun 9, 2017 at 5:38 PM, Alonso Isidoro Roman 
wrote:

> a quick search on google:
>
> https://www.cloudera.com/documentation/enterprise/5-9-
> x/topics/admin_spark_tuning.html
>
> https://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-1/
>
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-2/
>
> and of course, Jacek`s
> 
>
>
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>
> 2017-06-09 14:50 GMT+02:00 Debabrata Ghosh :
>
>> Hi,
>>  I need some help / guidance in performance tuning
>> Spark code written in Scala. Can you please help.
>>
>> Thanks
>>
>
>


Re: How to perform clean-up after stateful streaming processes an RDD?

2017-06-06 Thread Gerard Maas
It looks like the clean up should go into the foreachRDD function:

stateUpdateStream.foreachRdd(...) { rdd =>
// do stuff with the rdd

  stateUpdater.cleanupExternalService// should work in this position
}

Code within the foreachRDD(*) executes on the driver, so you can keep the
state of the object there.

What will not work is to update the stateUpdater state from a side effect
of the stateUpdateFunction used in the mapWithState transformation and
expect those changes to be visible at the call site sketched above.

kr, Gerard.

(*) a typical construct found in the wild is:
dstream.foreachRDD{rdd =>
   // do some preparation
   rdd.operation{elem => ... }
   ...
   // close/clean/report
}
So the code within the foreachRDD closure executes on the driver, *but* the
code within the rdd.operation{...} closure is a spark operation and
executes distributed on the executors.
One must be careful of not incorrectly mixing the scopes, in particular
when holding on to local state.



On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch 
wrote:

> We have some code we've written using stateful streaming (mapWithState)
> which works well for the most part.  The stateful streaming runs, processes
> the RDD of input data, calls the state spec function for each input record,
> and does all proper adding and removing from the state cache.
>
> However, I have a need to do some cleanup after stateful streaming
> processes the input data RDD, and I can't seem to find any place where we
> can put that code where it will run when it's supposed to.
>
> Essentially our state spec function needs to a) call out to an external
> service, b) hold some data from that service, and then c) inform the
> service to clean up the remaining data once the RDD is complete.
>
> I've gotten to the point where the code looks approximately like this:
>
>
> val eventStream = incomingStream.transform(...)
>
> val stateUpdater = new StateUpdater
> val stateUpdateStream = 
> eventStream.mapWithState(stateUpdater.stateUpdateFunction
> _)
>
> stateUpdateStream.foreachRdd(...) {
> ...
> }
> stateUpdater.cleanupExternalService// DOES NOT WORK!
>
>
> class StateUpdater extends Serializable {
>
> def stateUpdateFunction(key, value, state) {
> if (!state.initalized) {
> state.initialize(externalService)
> }
> ...
> }
>
> def cleanupExternalService {
> externalService.cleanup  // writes some data back to the external service
> }
>
> @transient lazy val externalService = new ExternalService
> }
>
>
> Note that the ExternalService object is holding onto a small bit of state
> that it needs to write back to the external service once we have completed
> running the stateUpdateFunction on every record in the RDD.  However this
> code doesn't actually work.  Because of the way Spark serializes objects on
> the driver and then deserializes them onto the executor, there's no way for
> me to get a hold of the ExternalService object that is being used on each
> RDD partition and clean up its leftover data.  Those objects seem to be
> held internally somewhere in the bowels of stateful streaming (where it
> processes an RDD of incoming data and applies it to the state).  And back
> in the main code where I'm trying to call the cleanup method, I'm actually
> calling it on a totally different object than the one that ran in the RDD
> partitions.  And stateful streaming doesn't provide me with any opportunity
> to perform any cleanup processing - say by calling some "rddDone" method to
> notify me that it just finished doing state processing on an RDD.  It just
> calls only the statespec function over and over, once for every record, and
> never notifying me that we've ended processing an RDD or started a new one.
>
>
> Is there any way out of this conundrum?  I've tried to avoid the problem
> by moving my interactions with the external service outside of the state
> spec function.  But that didn't work:  the interaction with the external
> service is really needed inside of the state spec function, and I caused a
> bug in our code when I tried to move it.
>
> Any suggestions that I might not have thought of on how to fix this issue?
>
> Thanks,
>
> DR
>


[StackOverflow] Size exceeds Integer.MAX_VALUE When Joining 2 Large DFs

2016-11-25 Thread Gerard Maas
This question seems to deserve an scalation from Stack Overflow:

http://stackoverflow.com/questions/40803969/spark-size-exceeds-integer-max-value-when-joining-2-large-dfs

Looks like an important limitation.

-kr, Gerard.

Meta:PS: What do you think would be the best way to scalate from SO? Should
I copy the question contents or just the link?


Re: HiveContext standalone => without a Hive metastore

2016-05-30 Thread Gerard Maas
Michael,  Mitch, Silvio,

Thanks!

The own directoy is the issue. We are running the Spark Notebook, which
uses the same dir per server (i.e. for all notebooks). So this issue
prevents us from running 2 notebooks using HiveContext.
I'll look in a proper Hive installation and I'm glad to know that this
dependency is gone in 2.0
Look forward to 2.1 :-) ;-)

-kr, Gerard.


On Thu, May 26, 2016 at 10:55 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> You can also just make sure that each user is using their own directory.
> A rough example can be found in TestHive.
>
> Note: in Spark 2.0 there should be no need to use HiveContext unless you
> need to talk to a metastore.
>
> On Thu, May 26, 2016 at 1:36 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Well make sure than you set up a reasonable RDBMS as metastore. Ours is
>> Oracle but you can get away with others. Check the supported list in
>>
>> hduser@rhes564:: :/usr/lib/hive/scripts/metastore/upgrade> ltr
>> total 40
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 postgres
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mysql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 mssql
>> drwxr-xr-x 2 hduser hadoop 4096 Feb 21 23:48 derby
>> drwxr-xr-x 3 hduser hadoop 4096 May 20 18:44 oracle
>>
>> you have few good ones in the list.  In general the base tables (without
>> transactional support) are around 55  (Hive 2) and don't take much space
>> (depending on the volume of tables). I attached a E-R diagram.
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 26 May 2016 at 19:09, Gerard Maas <gerard.m...@gmail.com> wrote:
>>
>>> Thanks a lot for the advice!.
>>>
>>> I found out why the standalone hiveContext would not work:  it was
>>> trying to deploy a derby db and the user had no rights to create the dir
>>> where there db is stored:
>>>
>>> Caused by: java.sql.SQLException: Failed to create database
>>> 'metastore_db', see the next exception for details.
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>>> Source)
>>>
>>>at
>>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>>> Source)
>>>
>>>... 129 more
>>>
>>> Caused by: java.sql.SQLException: Directory
>>> /usr/share/spark-notebook/metastore_db cannot be created.
>>>
>>>
>>> Now, the new issue is that we can't start more than 1 context at the
>>> same time. I think we will need to setup a proper metastore.
>>>
>>>
>>> -kind regards, Gerard.
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> To use HiveContext witch is basically an sql api within Spark without
>>>> proper hive set up does not make sense. It is a super set of Spark
>>>> SQLContext
>>>>
>>>> In addition simple things like registerTempTable may not work.
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 26 May 2016 at 13:01, Silvio Fiorito <silvio.fior...@granturing.com>
>>>> wrote:
>>>>
>>>>> Hi Gerard,
>>>>>
>>>>>
>>>>>
>>>>> I’ve never had an issue using the HiveContext without a hive-site.xml
>>>>> configured. However, one issue you may have is if multiple users are
>>>>> starting the HiveContext from the same path, they’ll all be trying to 
>>>>> store
>>>>> the default Derby metastore in the same location. Also, if you want them 
>>>>> to
>>>>> b

Re: HiveContext standalone => without a Hive metastore

2016-05-26 Thread Gerard Maas
Thanks a lot for the advice!.

I found out why the standalone hiveContext would not work:  it was trying
to deploy a derby db and the user had no rights to create the dir where
there db is stored:

Caused by: java.sql.SQLException: Failed to create database 'metastore_db',
see the next exception for details.

   at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)

   at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)

   ... 129 more

Caused by: java.sql.SQLException: Directory
/usr/share/spark-notebook/metastore_db cannot be created.


Now, the new issue is that we can't start more than 1 context at the same
time. I think we will need to setup a proper metastore.


-kind regards, Gerard.




On Thu, May 26, 2016 at 3:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> To use HiveContext witch is basically an sql api within Spark without
> proper hive set up does not make sense. It is a super set of Spark
> SQLContext
>
> In addition simple things like registerTempTable may not work.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 26 May 2016 at 13:01, Silvio Fiorito <silvio.fior...@granturing.com>
> wrote:
>
>> Hi Gerard,
>>
>>
>>
>> I’ve never had an issue using the HiveContext without a hive-site.xml
>> configured. However, one issue you may have is if multiple users are
>> starting the HiveContext from the same path, they’ll all be trying to store
>> the default Derby metastore in the same location. Also, if you want them to
>> be able to persist permanent table metadata for SparkSQL then you’ll want
>> to set up a true metastore.
>>
>>
>>
>> The other thing it could be is Hive dependency collisions from the
>> classpath, but that shouldn’t be an issue since you said it’s standalone
>> (not a Hadoop distro right?).
>>
>>
>>
>> Thanks,
>>
>> Silvio
>>
>>
>>
>> *From: *Gerard Maas <gerard.m...@gmail.com>
>> *Date: *Thursday, May 26, 2016 at 5:28 AM
>> *To: *spark users <user@spark.apache.org>
>> *Subject: *HiveContext standalone => without a Hive metastore
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm helping some folks setting up an analytics cluster with  Spark.
>>
>> They want to use the HiveContext to enable the Window functions on
>> DataFrames(*) but they don't have any Hive installation, nor they need one
>> at the moment (if not necessary for this feature)
>>
>>
>>
>> When we try to create a Hive context, we get the following error:
>>
>>
>>
>> > val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
>>
>> java.lang.RuntimeException: java.lang.RuntimeException: Unable to
>> instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>>
>>at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>>
>>
>>
>> Is my HiveContext failing b/c it wants to connect to an unconfigured
>>  Hive Metastore?
>>
>>
>>
>> Is there  a way to instantiate a HiveContext for the sake of Window
>> support without an underlying Hive deployment?
>>
>>
>>
>> The docs are explicit in saying that that is should be the case: [1]
>>
>>
>>
>> "To use a HiveContext, you do not need to have an existing Hive setup,
>> and all of the data sources available to aSQLContext are still
>> available. HiveContext is only packaged separately to avoid including
>> all of Hive’s dependencies in the default Spark build."
>>
>>
>>
>> So what is the right way to address this issue? How to instantiate a
>> HiveContext with spark running on a HDFS cluster without Hive deployed?
>>
>>
>>
>>
>>
>> Thanks a lot!
>>
>>
>>
>> -Gerard.
>>
>>
>>
>> (*) The need for a HiveContext to use Window functions is pretty obscure.
>> The only documentation of this seems to be a runtime exception: 
>> "org.apache.spark.sql.AnalysisException:
>> Could not resolve window function 'max'. Note that, using window functions
>> currently requires a HiveContext;"
>>
>>
>>
>> [1]
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started
>>
>
>


HiveContext standalone => without a Hive metastore

2016-05-26 Thread Gerard Maas
Hi,

I'm helping some folks setting up an analytics cluster with  Spark.
They want to use the HiveContext to enable the Window functions on
DataFrames(*) but they don't have any Hive installation, nor they need one
at the moment (if not necessary for this feature)

When we try to create a Hive context, we get the following error:

> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

   at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

Is my HiveContext failing b/c it wants to connect to an unconfigured  Hive
Metastore?

Is there  a way to instantiate a HiveContext for the sake of Window support
without an underlying Hive deployment?

The docs are explicit in saying that that is should be the case: [1]

"To use a HiveContext, you do not need to have an existing Hive setup, and
all of the data sources available to aSQLContext are still available.
HiveContext is only packaged separately to avoid including all of Hive’s
dependencies in the default Spark build."

So what is the right way to address this issue? How to instantiate a
HiveContext with spark running on a HDFS cluster without Hive deployed?


Thanks a lot!

-Gerard.

(*) The need for a HiveContext to use Window functions is pretty obscure.
The only documentation of this seems to be a runtime exception: "
org.apache.spark.sql.AnalysisException: Could not resolve window function
'max'. Note that, using window functions currently requires a HiveContext;"


[1]
http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started


Re: Create one DB connection per executor

2016-03-24 Thread Gerard Maas
Hi Manas,

The approach is correct, with one caveat: You may have several tasks
executing in parallel in one executor. Having one single connection per JVM
will either fail, if the connection is not thread-safe or become a
bottleneck b/c all task will be competing for the same resource.
The best approach would be to extend your current idea with a pool of
connections, where you can 'borrow'  a connection and return it after use.

-kr, Gerard.


On Thu, Mar 24, 2016 at 2:00 PM, Manas  wrote:

> I understand that using foreachPartition I can create one DB connection per
> partition level. Is there a way to create a DB connection per executor
> level
> and share that for all partitions/tasks run within that executor? One
> approach I am thinking is to have a singleton with say a getConnection
> method. The connection object is not created in the driver rather it passes
> to the the singleton object the DB connection detail (host, port, user,
> password etc). In the foreachPartition this singleton object is passed too.
> The getConnection method of the singleton creates the actual connection
> object only the first time it's called and returns the same connection
> instance for all later invocations. I believe that way each executor JVM
> will have one instance of the singleton/connection and thus all
> partitions/tasks running within that executor would share the same
> connection. I'd like to validate this approach with the spark experts. Does
> it have any inherent flaw or is there a better way to create one instance
> of
> an object per executor?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Create-one-DB-connection-per-executor-tp26588.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Evaluating spark streaming use case

2016-02-21 Thread Gerard Maas
It sounds like another  window operation on top of the 30-min window will
achieve the  desired objective.
Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl)
to a long enough value and you will require enough resources (mem & disk)
to keep the required data.

-kr, Gerard.

On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello Spark users,
>
> I have to aggregate messages from kafka and at some fixed interval (say
> every half hour) update a memory persisted RDD and run some computation.
> This computation uses last one day data. Steps are:
>
> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
> - Filter the above DStream messages and keep some of them
> - Create windows of 30 minutes on above DStream and aggregate by Key
> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
> - Maintain last N such RDDs in a deque persisting them on disk. While
> adding new RDD, subtract oldest RDD from the combinedRdd.
> - Final step consider last N such windows (of 30 minutes each) and do
> final aggregation
>
> Does the above way of using spark streaming looks reasonable? Is there a
> better way of doing the above?
>
> --
> Thanks
> Jatin
>
>


Hadoop credentials missing in some tasks?

2016-02-05 Thread Gerard Maas
Hi,

We're facing a situation where simple queries to parquet files stored in
Swift through a Hive Metastore sometimes fail with this exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
in stage 58.0 failed 4 times, most recent failure: Lost task 6.3 in stage
58.0 (TID 412, agent-1.mesos.private):
org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url
at 
org.apache.hadoop.fs.swift.http.RestClientBindings.copy(RestClientBindings.java:219)
(...)

Queries requiring a full table scan, like select(count(*)) would fail with
the mentioned exception while smaller chunks of work like " select *
 from... LIMIT 5" would succeed.

The problem seems to relate to the number of tasks scheduled:

If we force a reduction of the number of tasks to 1, the job  succeeds:

dataframe.rdd.coalesce(1).count()

Would return a correct result while

dataframe.count() would fail with the exception mentioned  above.

To me, it looks like credentials are lost somewhere in the serialization
path when the tasks are submitted to the cluster.  I have not found an
explanation yet to why a job that requires only one task succeeds.

We are running on Apache Zepellin  for Swift and Spark Notebook for S3.
Both show an equivalent exception within their specific hadoop filesystem
implementation when the task fails:

Zepelling + Swift:

org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url

Spark Notebook + S3:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)

Valid credentials are being set programmatically through
sc.hadoopConfiguration

Our system: Zepellin or Spark Notebook with Spark 1.5.1 running on Docker,
Docker running on Mesos, Hadoop 2.4.0. One environment running on Softlayer
(Swift) and other Amazon EC2 (S3) of similar sizes.

Any ideas on how to address this issue or figure out what's going on??

Thanks,  Gerard.


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
What are you trying to achieve?

Looks like you want to provide offsets but you're not managing them and I'm
assuming you're using the direct stream approach.

In that case, use the simpler constructor that takes the kafka config and
the topics. Let it figure it out the offsets (it will contact kafka and
request the partitions for the topics provided)

KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)

 -kr, Gerard

On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni  wrote:

> Hi All ,
>
> What is the best way to tell spark streaming job for the no of partition
> to to a given topic -
>
> Should that be provided as a parameter or command line argument
> or
> We should connect to kafka in the driver program and query it
>
> Map fromOffsets = new HashMap Long>();
> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>
> Thanks,
> Ashish
>


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
That's precisely what this constructor does:
KafkaUtils.createDirectStream[...](ssc,
kafkaConfig, topics)

Is there a reason to do that yourself?  In that case, look at how it's done
in Spark Streaming for inspiration:
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L204

-kr, Gerard.




On Mon, Jan 25, 2016 at 5:53 PM, Ashish Soni <asoni.le...@gmail.com> wrote:

> Correct what i am trying to achieve is that before the streaming job
> starts query the topic meta data from kafka , determine all the partition
> and provide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> What are you trying to achieve?
>>
>> Looks like you want to provide offsets but you're not managing them
>> and I'm assuming you're using the direct stream approach.
>>
>> In that case, use the simpler constructor that takes the kafka config and
>> the topics. Let it figure it out the offsets (it will contact kafka and
>> request the partitions for the topics provided)
>>
>> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>>
>>  -kr, Gerard
>>
>> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni <asoni.le...@gmail.com>
>> wrote:
>>
>>> Hi All ,
>>>
>>> What is the best way to tell spark streaming job for the no of partition
>>> to to a given topic -
>>>
>>> Should that be provided as a parameter or command line argument
>>> or
>>> We should connect to kafka in the driver program and query it
>>>
>>> Map<TopicAndPartition, Long> fromOffsets = new
>>> HashMap<TopicAndPartition, Long>();
>>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>>
>>> Thanks,
>>> Ashish
>>>
>>
>>
>


Re: Inconsistent data in Cassandra

2015-12-13 Thread Gerard Maas
Hi Padma,

Have you considered reducing the dataset before writing it to Cassandra? Looks 
like this consistency problem could be avoided by cleaning the dataset of 
unnecessary records before persisting it:

val onlyMax = rddByPrimaryKey.reduceByKey{case (x,y) => Max(x,y)} // your max 
function here will need to pick the right max value from the records attached 
to the same primary key

-kr, Gerard.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: flatMap function in Spark

2015-12-08 Thread Gerard Maas
http://stackoverflow.com/search?q=%5Bapache-spark%5D+flatmap

-kr, Gerard.

On Tue, Dec 8, 2015 at 12:04 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Guys... I am new to Spark..
> Please anyone please explain me how flatMap function works with a little
> sample example...
> Thanks in advance...
>


Re: Spark DStream Data stored out of order in Cassandra

2015-11-30 Thread Gerard Maas
Spark Streaming will consumer and process data in parallel. So the order of
the output will depend not only on the order of the input but also in the
time it takes for each task to process. Different options, like
repartitions, sorts and shuffles at Spark level will also affect ordering,
so the best way would be to rely on the scheme in Cassandra to ensure the
ordering expected by the application.

What is the schema you're using at the Cassandra side?  And how is the data
going to be queried?   That last question should drive the required
ordering.

-kr, Gerard.

On Mon, Nov 30, 2015 at 12:37 PM, Prateek .  wrote:

> Hi,
>
>
>
> I have an time critical spark application, which is taking sensor data
> from kafka stream, storing in case class, applying transformations and then
> storing in cassandra schema. The data needs to be stored in schema, in FIFO
> order.
>
>
>
> The order is maintained at kafka queue but I am observing, out of order
> data in Cassandra schema. Does Spark Streaming provide any functionality to
> retain order. Or do we need do implement some sorting based on timestamp of
> arrival.
>
>
>
>
>
> Regards,
>
> Prateek
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely
> for the use of the individual to whom it is addressed. It may contain
> privileged or confidential information and should not be circulated or used
> for any purpose other than for what it is intended. If you have received
> this message in error, please notify the originator immediately. If you are
> not the intended recipient, you are notified that you are strictly
> prohibited from using, copying, altering, or disclosing the contents of
> this message. Aricent accepts no responsibility for loss or damage arising
> from the use of the information transmitted by this email including damage
> from virus."
>


Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-08 Thread Gerard Maas
Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-6”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>public static void main(String[] args) {
>
>SparkConf conf = new SparkConf().setAppName(appName);
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function(){
>
> private static final long serialVersionUID =
> -7957854392903581284L;
>
>
> @Override
>
> public Void call(JavaRDD jsonStr) throws Exception {
>
> jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
> return null;
>
> }
>
> });
>
>
>
> ssc.checkpoint(checkPointUri);
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> }
>


Re: How to check whether the RDD is empty or not

2015-10-21 Thread Gerard Maas
As TD mentions, there's no such thing as an 'empty DStream'. Some intervals
of a DStream could be empty, in which case the related RDD will be empty.
This means that you should express such condition based on the RDD's of the
DStream. Translated in code:

dstream.foreachRDD{ rdd =>
 if (!rdd.isEmpty) {
...do stuff ...
}
}


On Wed, Oct 21, 2015 at 9:00 PM, Tathagata Das  wrote:

> What do you mean by checking when a "DStream is empty"? DStream represents
> an endless stream of data, and at point of time checking whether it is
> empty or not does not make sense.
>
> FYI, there is RDD.isEmpty()
>
>
>
> On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru <
> diplomaticg...@gmail.com> wrote:
>
>> I tried below code but still carrying out the action even though there is no 
>> new data.
>>
>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>> LongWritable.class,Text.class, TextInputFormat.class);
>>
>>  if(input != null){
>> //do some action if it is not empty
>> }
>>
>>
>> On 21 October 2015 at 18:00, diplomatic Guru 
>> wrote:
>>
>>>
>>> Hello All,
>>>
>>> I have a Spark Streaming job that should  do some action only if the RDD
>>> is not empty. This can be done easily with the spark batch RDD as I could
>>> .take(1) and check whether it is empty or  not. But this cannot been done
>>> in Spark Streaming DStrem
>>>
>>>
>>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>>> LongWritable.class,Text.class, TextInputFormat.class);
>>>
>>>  if(inputLines!=null){
>>> //do some action if it is not empty
>>> }
>>>
>>> Any ideas please?
>>>
>>>
>>>
>>>
>>
>


Re: Is there a way to create multiple streams in spark streaming?

2015-10-20 Thread Gerard Maas
You can create as many functional derivates of your original stream by
using transformations. That's exactly the model that Spark Streaming offers.

In your example, that would become something like:

val stream = ssc.socketTextStream("localhost", )
val stream1 = stream.map(fun1)
val stream2 = stream.map(fun2)
// you could also:
val stream3 = stream2.filter(predicate).flatMap(ffun3)

// Then you need some action to materialize the streams:
stream2.print
stream2.saveAsTextFiles()

-kr, Gerard.


On Tue, Oct 20, 2015 at 12:20 PM, LinQili  wrote:

> Hi all,
> I wonder if there is a way to create some child streaming while using
> spark streaming?
> For example, I create a netcat main stream, read data from a socket, then
> create 3 different child streams on the main stream,
> in stream1, we do fun1 on the input data then print result to screen;
> in stream2, we do fun2 on the input data then print result to screen;
> in stream3, we do fun3 on the input data then print result to screen.
> Is any one some hints?
>


Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver
starts as a long-running task, one can rely in a certain degree of data
locality based on the kafka partitioning:  Data published on a given
topic/partition will land on the same spark streaming receiving node until
the receiver dies and needs to be restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and
relays the work to a KafkaRDD. How is the execution locality compared to
the receiver-based approach?

thanks, Gerard.


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody,

I think that I misused the term 'data locality'. I think I should better
call it "node affinity"  instead, as this is what I would like to have:
For as long as an executor is available, I would like to have the same
kafka partition processed by the same node in order to take advantage of
local in-memory structures.

In the receiver-based mode this was a given. Any ideas how to achieve that
with the direct stream approach?

-greetz, Gerard.


On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Assumptions about locality in spark are not very reliable, regardless of
> what consumer you use.  Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>> Indeed, that hint will only work on a case where the Spark executor is
>> co-located with the Kafka broker.
>> I think the answer to my question as stated  is that there's no warranty
>> of where the task will execute as it will depend on the scheduler and
>> cluster resources available  (Mesos in our case).
>> Therefore, any assumptions made about data locality using the
>> consumer-based approach need to be reconsidered when migrating to the
>> direct stream.
>>
>> ((In our case, we were using local caches to decide when a given
>> secondary index for a record should be produced and written.))
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously
>>> this locality hint takes no effect, and spark will schedule tasks following
>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>
>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <rmis...@snappydata.io
>>> > wrote:
>>>
>>>> Hi Gerard,
>>>> I am also trying to understand the same issue. Whatever code I have
>>>> seen it looks like once Kafka RDD is constructed the execution of that RDD
>>>> is upto the task scheduler and it can schedule the partitions based on the
>>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>>> probably this will work. If not, I am not sure how to get data locality for
>>>> a partition.
>>>> Others,
>>>> correct me if there is a way.
>>>>
>>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas <gerard.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> In the receiver-based kafka streaming model, given that each receiver
>>>>> starts as a long-running task, one can rely in a certain degree of data
>>>>> locality based on the kafka partitioning:  Data published on a given
>>>>> topic/partition will land on the same spark streaming receiving node until
>>>>> the receiver dies and needs to be restarted somewhere else.
>>>>>
>>>>> As I understand, the direct-kafka streaming model just computes
>>>>> offsets and relays the work to a KafkaRDD. How is the execution locality
>>>>> compared to the receiver-based approach?
>>>>>
>>>>> thanks, Gerard.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Rishitesh Mishra,
>>>> SnappyData . (http://www.snappydata.io/)
>>>>
>>>> https://in.linkedin.com/in/rishiteshmishra
>>>>
>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of
where the task will execute as it will depend on the scheduler and cluster
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the
consumer-based approach need to be reconsidered when migrating to the
direct stream.

((In our case, we were using local caches to decide when a given secondary
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> This preferred locality is a hint to spark to schedule Kafka tasks on the
> preferred nodes, if Kafka and Spark are two separate cluster, obviously
> this locality hint takes no effect, and spark will schedule tasks following
> node-local -> rack-local -> any pattern, like any other spark tasks.
>
> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <rmis...@snappydata.io>
> wrote:
>
>> Hi Gerard,
>> I am also trying to understand the same issue. Whatever code I have seen
>> it looks like once Kafka RDD is constructed the execution of that RDD is
>> upto the task scheduler and it can schedule the partitions based on the
>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> In the receiver-based kafka streaming model, given that each receiver
>>> starts as a long-running task, one can rely in a certain degree of data
>>> locality based on the kafka partitioning:  Data published on a given
>>> topic/partition will land on the same spark streaming receiving node until
>>> the receiver dies and needs to be restarted somewhere else.
>>>
>>> As I understand, the direct-kafka streaming model just computes offsets
>>> and relays the work to a KafkaRDD. How is the execution locality compared
>>> to the receiver-based approach?
>>>
>>> thanks, Gerard.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>


Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-07 Thread Gerard Maas
Thanks for the feedback.

Cassandra does not seem to be the issue. The time for writing to Cassandra
is in the same order of magnitude (see below)

The code structure is roughly as follows:

dstream.filter(pred).foreachRDD{rdd =>
  val sparkT0 = currentTimeMs
  val metrics = rdd.mapPartitions{partition =>
 val partitionT0 = currentTimeMs
  partition.foreach{ transform andThen storeInCassandra _}
 val partitionT1 = currentTimeMs
 Seq(Metric( "local time", executor, partitionT1 - partitionT0,
records)).iterator
  }
  //materialize the rdd
  val allMetrics = metrics.collect()
  val sparkT1 = currentTimeMs
  val totalizedMetrics = // group by and reduce with sum
  val sparkT2 = currentTimeMs
  totalizedMetrics.foreach{ metric => gmetric.report(metric)}
}

Relating this code with the time table presented before (time in ms):

How measured?Slow TaskFast Taskexecutor local totalizedMetrics347.6281.53spark
computationsparkT1 - sparkT06930263metric collectionsparkT2 - sparkT170138wall
clock processsparkT2 - sparkT07000401total records processedtotalizedMetrics
42975002

What we observe is that the largest difference comes from the
materialization of the RDD. This pattern repeats cyclically one on, one off.

Any ideas where to further look?

kr, Gerard.


On Wed, Oct 7, 2015 at 1:33 AM, Tathagata Das <t...@databricks.com> wrote:

> Good point!
>
> On Tue, Oct 6, 2015 at 4:23 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> I agree getting cassandra out of the picture is a good first step.
>>
>> But if you just do foreachRDD { _.count } recent versions of direct
>> stream shouldn't do any work at all on the executor (since the number of
>> messages in the rdd is known already)
>>
>> do a foreachPartition and println or count the iterator manually.
>>
>> On Tue, Oct 6, 2015 at 6:02 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Are sure that this is not related to Cassandra inserts? Could you just
>>> do foreachRDD { _.count } instead  to keep Cassandra out of the picture and
>>> then test this agian.
>>>
>>> On Tue, Oct 6, 2015 at 12:33 PM, Adrian Tanase <atan...@adobe.com>
>>> wrote:
>>>
>>>> Also check if the Kafka cluster is still balanced. Maybe one of the
>>>> brokers manages too many partitions, all the work will stay on that
>>>> executor unless you repartition right after kakfka (and I'm not saying you
>>>> should).
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On 06 Oct 2015, at 22:17, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> I'm not clear on what you're measuring.  Can you post relevant code
>>>> snippets including the measurement code?
>>>>
>>>> As far as kafka metrics, nothing currently.  There is an info-level log
>>>> message every time a kafka rdd iterator is instantiated,
>>>>
>>>> log.info(s"Computing topic ${part.topic}, partition
>>>> ${part.partition} " +
>>>>
>>>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>>
>>>>
>>>> If you log once you're done with an iterator you should be able to see
>>>> the delta.
>>>>
>>>> The other thing to try is reduce the number of parts involved in the
>>>> job to isolate it ... first thing I'd do there is take cassandra out of the
>>>> equation.
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:00 PM, Gerard Maas <gerard.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> The job is doing ETL from Kafka records to Cassandra. After a
>>>>> single filtering stage on Spark, the 'TL' part is done using the
>>>>> dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.
>>>>>
>>>>> We have metrics on the executor work which we collect and add
>>>>> together, indicated here by 'local computation'.  As you can see, we also
>>>>> measure how much it cost us to measure :-)
>>>>> See how 'local work'  times are comparable.  What's not visible is the
>>>>> task scheduling and consuming the data from Kafka which becomes part of 
>>>>> the
>>>>> 'spark computation' part.
>>>>>
>>>>> The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...
>>>>>
>>>>> Are there metrics available somehow on the Kafka reading time?
>>>>>
>>>>>

Re: Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi Cody,

The job is doing ETL from Kafka records to Cassandra. After a
single filtering stage on Spark, the 'TL' part is done using the
dstream.foreachRDD{rdd.foreachPartition{...TL ...}} pattern.

We have metrics on the executor work which we collect and add together,
indicated here by 'local computation'.  As you can see, we also measure how
much it cost us to measure :-)
See how 'local work'  times are comparable.  What's not visible is the task
scheduling and consuming the data from Kafka which becomes part of the
'spark computation' part.

The pattern we see is 1 fast, 1 slow, 1 fast,... zig...zag...

Are there metrics available somehow on the Kafka reading time?

Slow TaskFast Tasklocal computation347.6281.53spark computation6930263metric
collection70138wall clock process7000401total records processed42975002

(time in ms)

kr, Gerard.


On Tue, Oct 6, 2015 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Can you say anything more about what the job is doing?
>
> First thing I'd do is try to get some metrics on the time taken by your
> code on the executors (e.g. when processing the iterator) to see if it's
> consistent between the two situations.
>
> On Tue, Oct 6, 2015 at 11:45 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We recently migrated our streaming jobs to the direct kafka receiver. Our
>> initial migration went quite fine but now we are seeing a weird zig-zag
>> performance pattern we cannot explain.
>> In alternating fashion, one task takes about 1 second to finish and the
>> next takes 7sec for a stable streaming rate.
>>
>> Here are comparable metrics for two successive tasks:
>> *Slow*:
>>
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
>> 20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s110
>> 1120151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s
>> 10010
>> *Fast*:
>>
>> ​
>>
>> Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
>> 20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s40
>> 420151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
>> 20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s110
>> 11
>> We have some custom metrics that measure wall-clock time of execution of
>> certain blocks of the job, like the time it takes to do the local
>> computations (RDD.foreachPartition closure) vs total time.
>> The difference between the slow and fast executing task is on the 'spark
>> computation time' which is wall-clock for the task scheduling
>> (DStream.foreachRDD closure)
>>
>> e.g.
>> Slow task:
>>
>> local computation time: 347.6096849996, *spark computation time:
>> 6930*, metric collection: 70, total process: 7000, total_records: 4297
>>
>> Fast task:
>> local computation time: 281.539042,* spark computation time: 263*,
>> metric collection: 138, total process: 401, total_records: 5002
>>
>> We are currently running Spark 1.4.1. The load and the work to be done is
>> stable -this is on a dev env with that stuff under control.
>>
>> Any ideas what this behavior could be?
>>
>> thanks in advance,  Gerard.
>>
>>
>>
>>
>>
>>
>>
>


Weird performance pattern of Spark Streaming (1.4.1) + direct Kafka

2015-10-06 Thread Gerard Maas
Hi,

We recently migrated our streaming jobs to the direct kafka receiver. Our
initial migration went quite fine but now we are seeing a weird zig-zag
performance pattern we cannot explain.
In alternating fashion, one task takes about 1 second to finish and the
next takes 7sec for a stable streaming rate.

Here are comparable metrics for two successive tasks:
*Slow*:


​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:3686322 s303
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:4381240 s11011
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:5994549 s10010
*Fast*:

​

Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded Tasks
20151006-044141-2408867082-5050-21047-S0dnode-3.hdfs.private:368630.6 s404
20151006-044141-2408867082-5050-21047-S1dnode-0.hdfs.private:438121 s909
20151006-044141-2408867082-5050-21047-S4dnode-5.hdfs.private:599451 s11011
We have some custom metrics that measure wall-clock time of execution of
certain blocks of the job, like the time it takes to do the local
computations (RDD.foreachPartition closure) vs total time.
The difference between the slow and fast executing task is on the 'spark
computation time' which is wall-clock for the task scheduling
(DStream.foreachRDD closure)

e.g.
Slow task:

local computation time: 347.6096849996, *spark computation time: 6930*,
metric collection: 70, total process: 7000, total_records: 4297

Fast task:
local computation time: 281.539042,* spark computation time: 263*, metric
collection: 138, total process: 401, total_records: 5002

We are currently running Spark 1.4.1. The load and the work to be done is
stable -this is on a dev env with that stuff under control.

Any ideas what this behavior could be?

thanks in advance,  Gerard.


Re: Kafka Direct Stream

2015-10-03 Thread Gerard Maas
Hi,

collect(partialFunction) is equivalent to filter(x=>
partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
equivalent to your expression. I favor collect for its more compact form
but that's a personal preference. Use what you feel reads best.

Regarding performance, there will be some overhead of submitting many a
task for every filtered RDD that gets materialized to Cassandra. That's the
reason I proposed the ticket linked above. Have a look whether that would
improve your particular usecase and vote for it if so :-)

-kr, Gerard.

On Sat, Oct 3, 2015 at 3:53 PM, varun sharma <varunsharman...@gmail.com>
wrote:

> Thanks Gerardthe code snippet you shared worked.. but can you please
> explain/point me the usage of *collect* here. How it is
> different(performance/readability) from *filter.*
>
>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>
>
> I am doing something like this.Please tell if I can improve the *Processing
> time* of this particular code:
>
> kafkaStringStream.foreachRDD{rdd =>
>   val topics = rdd.map(_._1).distinct().collect()
>   if (topics.length > 0) {
> val rdd_value = rdd.take(10).mkString("\n.\n")
> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
> feeds\n$rdd_value"))
>
> topics.foreach { topic =>
>   //rdd.filter(x=> x._1 == topic).map(_._2)
>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
> }
> updateOffsetsinZk(rdd)
>   }
>
> }
>
> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Something like this?
>>
>> I'm making the assumption that your topic name equals your keyspace for
>> this filtering example.
>>
>> dstream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct.collect
>>   topics.foreach{topic =>
>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>> collect with rdd.collect() that brings data to the driver
>>   }
>> }
>>
>>
>> I'm wondering: would something like this (
>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>> purposes?
>>
>> -kr, Gerard.
>>
>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma <varunsharman...@gmail.com>
>> wrote:
>>
>>> Hi Adrian,
>>>
>>> Can you please give an example of how to achieve this:
>>>
>>>> *I would also look at filtering by topic and saving as different
>>>> Dstreams in your code*
>>>
>>> I have managed to get DStream[(String, String)] which is (
>>> *topic,my_data)* tuple. Lets call it kafkaStringStream.
>>> Now if I do kafkaStringStream.groupByKey() then I would get a
>>> DStream[(String,Iterable[String])].
>>> But I want a DStream instead of Iterable in order to apply
>>> saveToCassandra for storing it.
>>>
>>> Please help in how to transform iterable to DStream or any other
>>> workaround for achieving same.
>>>
>>>
>>> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase <atan...@adobe.com> wrote:
>>>
>>>> On top of that you could make the topic part of the key (e.g. keyBy in
>>>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>>>> operators for the processing.
>>>>
>>>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>>>> topics) and the processing is *really* different, I would also look at
>>>> filtering by topic and saving as different Dstreams in your code.
>>>>
>>>> Either way you need to start with Cody’s tip in order to extract the
>>>> topic name.
>>>>
>>>> -adrian
>>>>
>>>> From: Cody Koeninger
>>>> Date: Thursday, October 1, 2015 at 5:06 PM
>>>> To: Udit Mehta
>>>> Cc: user
>>>> Subject: Re: Kafka Direct Stream
>>>>
>>>> You can get the topic for a given partition from the offset range.  You
>>>> can either filter using that; or just have a single rdd and match on topic
>>>> when doing mapPartitions or foreachPartition (which I think is a better
>>>> idea)
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>
>>>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta <ume...@groupon.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using spark direct stream to consume from multiple topics in
>>>>> Kafka. I am able to consume fine but I am stuck at how to separate the 
>>>>> data
>>>>> for each topic since I need to process data differently depending on the
>>>>> topic.
>>>>> I basically want to split the RDD consisting on N topics into N RDD's
>>>>> each having 1 topic.
>>>>>
>>>>> Any help would be appreciated.
>>>>>
>>>>> Thanks in advance,
>>>>> Udit
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Kafka Direct Stream

2015-10-02 Thread Gerard Maas
Something like this?

I'm making the assumption that your topic name equals your keyspace for
this filtering example.

dstream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct.collect
  topics.foreach{topic =>
val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
collect with rdd.collect() that brings data to the driver
  }
}


I'm wondering: would something like this (
https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
purposes?

-kr, Gerard.

On Fri, Oct 2, 2015 at 8:12 PM, varun sharma 
wrote:

> Hi Adrian,
>
> Can you please give an example of how to achieve this:
>
>> *I would also look at filtering by topic and saving as different Dstreams
>> in your code*
>
> I have managed to get DStream[(String, String)] which is (*topic,my_data)*
> tuple. Lets call it kafkaStringStream.
> Now if I do kafkaStringStream.groupByKey() then I would get a
> DStream[(String,Iterable[String])].
> But I want a DStream instead of Iterable in order to apply saveToCassandra
> for storing it.
>
> Please help in how to transform iterable to DStream or any other
> workaround for achieving same.
>
>
> On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase  wrote:
>
>> On top of that you could make the topic part of the key (e.g. keyBy in
>> .transform or manually emitting a tuple) and use one of the .xxxByKey
>> operators for the processing.
>>
>> If you have a stable, domain specific list of topics (e.g. 3-5 named
>> topics) and the processing is *really* different, I would also look at
>> filtering by topic and saving as different Dstreams in your code.
>>
>> Either way you need to start with Cody’s tip in order to extract the
>> topic name.
>>
>> -adrian
>>
>> From: Cody Koeninger
>> Date: Thursday, October 1, 2015 at 5:06 PM
>> To: Udit Mehta
>> Cc: user
>> Subject: Re: Kafka Direct Stream
>>
>> You can get the topic for a given partition from the offset range.  You
>> can either filter using that; or just have a single rdd and match on topic
>> when doing mapPartitions or foreachPartition (which I think is a better
>> idea)
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta  wrote:
>>
>>> Hi,
>>>
>>> I am using spark direct stream to consume from multiple topics in Kafka.
>>> I am able to consume fine but I am stuck at how to separate the data for
>>> each topic since I need to process data differently depending on the topic.
>>> I basically want to split the RDD consisting on N topics into N RDD's
>>> each having 1 topic.
>>>
>>> Any help would be appreciated.
>>>
>>> Thanks in advance,
>>> Udit
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: unoin streams not working for streams > 3

2015-09-14 Thread Gerard Maas
How many cores are you assigning to your spark streaming job?

On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий 
wrote:

> hello
> I have 4 streams from kafka and streaming not working.
> without any errors or logs
> but with 3 streams everything perfect.
> make sense only amount of streams , different triple combinations always
> working.
> any ideas how to debug or fix it ?
>
>
>


Re: [streaming] Using org.apache.spark.Logging will silently break task execution

2015-09-06 Thread Gerard Maas
You need to take into consideration 'where' things are executing. The
closure of the 'forEachRDD'  executes in the driver. Therefore, the log
statements printed during the execution of that part will be found in the
driver logs.
In contrast, the foreachPartition closure executes on the worker nodes. You
will find the '+++ForEachPartition+++' messages printed in the executor log.

So both statements execute, but in different locations of the distributed
computing environment (aka cluster)

-kr, Gerard.

On Sun, Sep 6, 2015 at 10:53 PM, Alexey Ponkin  wrote:

> Hi,
>
> I have the following code
>
> object MyJob extends org.apache.spark.Logging{
> ...
>  val source: DStream[SomeType] ...
>
>  source.foreachRDD { rdd =>
>   logInfo(s"""+++ForEachRDD+++""")
>   rdd.foreachPartition { partitionOfRecords =>
> logInfo(s"""+++ForEachPartition+++""")
>   }
>   }
>
> I was expecting to see both log messages in job log.
> But unfortunately you will never see string '+++ForEachPartition+++' in
> logs, cause block foreachPartition will never execute.
> And also there is no error message or something in logs.
> I wonder is this a bug or known behavior?
> I know that org.apache.spark.Logging is DeveloperAPI, but why it is
> silently fails with no messages?
> What to use instead of org.apache.spark.Logging? in spark-streaming jobs?
>
> P.S. running spark 1.4.1 (on yarn)
>
> Thanks in advance
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Writing streaming data to cassandra creates duplicates

2015-08-04 Thread Gerard Maas
(removing dev from the to: as not relevant)

it would be good to see some sample data and the cassandra schema to have a
more concrete idea of the problem space.

Some thoughts: reduceByKey could still be used to 'pick' one element.
example of arbitrarily choosing the first one: reduceByKey{case (e1,e2) =
e1}

The question to be answered is: what should happen to the multiple values
that arrive for 1 key?

And why are they creating duplicates in cassandra? if they have the same
key, they will result in an overwrite (that's not desirable due to
tombstones anyway)

-kr, Gerard.



On Tue, Aug 4, 2015 at 1:03 PM, Priya Ch learnings.chitt...@gmail.com
wrote:




 Yes...union would be one solution. I am not doing any aggregation hence
 reduceByKey would not be useful. If I use groupByKey, messages with same
 key would be obtained in a partition. But groupByKey is very expensive
 operation as it involves shuffle operation. My ultimate goal is to write
 the messages to cassandra. if the messages with same key are handled by
 different streams...there would be concurrency issues. To resolve this i
 can union dstreams and apply hash parttioner so that it would bring all the
 same keys to a single partition or do a groupByKey which does the same.

 As groupByKey is expensive, is there any work around for this ?

 On Thu, Jul 30, 2015 at 2:33 PM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 Just my two cents. I understand your problem is that your problem is that
 you have messages with the same key in two different dstreams. What I would
 do would be making a union of all the dstreams with StreamingContext.union
 or several calls to DStream.union, and then I would create a pair dstream
 with the primary key as key, and then I'd use groupByKey or reduceByKey (or
 combineByKey etc) to combine the messages with the same primary key.

 Hope that helps.

 Greetings,

 Juan


 2015-07-30 10:50 GMT+02:00 Priya Ch learnings.chitt...@gmail.com:

 Hi All,

  Can someone throw insights on this ?

 On Wed, Jul 29, 2015 at 8:29 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:



 Hi TD,

  Thanks for the info. I have the scenario like this.

  I am reading the data from kafka topic. Let's say kafka has 3
 partitions for the topic. In my streaming application, I would configure 3
 receivers with 1 thread each such that they would receive 3 dstreams (from
 3 partitions of kafka topic) and also I implement partitioner. Now there is
 a possibility of receiving messages with same primary key twice or more,
 one is at the time message is created and other times if there is an update
 to any fields for same message.

 If two messages M1 and M2 with same primary key are read by 2 receivers
 then even the partitioner in spark would still end up in parallel
 processing as there are altogether in different dstreams. How do we address
 in this situation ?

 Thanks,
 Padma Ch

 On Tue, Jul 28, 2015 at 12:12 PM, Tathagata Das t...@databricks.com
 wrote:

 You have to partition that data on the Spark Streaming by the primary
 key, and then make sure insert data into Cassandra atomically per key, or
 per set of keys in the partition. You can use the combination of the 
 (batch
 time, and partition Id) of the RDD inside foreachRDD as the unique id for
 the data you are inserting. This will guard against multiple attempts to
 run the task that inserts into Cassandra.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations

 TD

 On Sun, Jul 26, 2015 at 11:19 AM, Priya Ch 
 learnings.chitt...@gmail.com wrote:

 Hi All,

  I have a problem when writing streaming data to cassandra. Or
 existing product is on Oracle DB in which while wrtiting data, locks are
 maintained such that duplicates in the DB are avoided.

 But as spark has parallel processing architecture, if more than 1
 thread is trying to write same data i.e with same primary key, is there 
 as
 any scope to created duplicates? If yes, how to address this problem 
 either
 from spark or from cassandra side ?

 Thanks,
 Padma Ch











Re: Spark Streaming

2015-07-29 Thread Gerard Maas
A side question: Any reason why you're using window(Seconds(10), Seconds(10))
instead of new StreamingContext(conf, Seconds(10)) ?
Making the micro-batch interval 10 seconds instead of 1 will provide you
the same 10-second window with less complexity.  Of course, this might just
be a test for the window functionality.

-kr, Gerard.

On Wed, Jul 29, 2015 at 10:54 AM, Sadaf sa...@platalytics.com wrote:

 Hi,

 I am new to Spark Streaming and writing a code for twitter connector.
 I am facing the following exception.

 ERROR StreamingContext: Error starting the context, marking it as stopped
 org.apache.spark.SparkException:
 org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
 initialized
 at
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
 at

 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
 at

 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
 at scala.Option.orElse(Option.scala:257)
 at
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
 at

 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
 at

 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
 at

 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
 at

 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at

 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at

 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
 at

 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
 at

 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
 at

 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at

 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
 at

 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
 at

 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
 at

 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
 at

 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
 at twitter.streamingSpark$.twitterConnector(App.scala:38)
 at twitter.streamingSpark$.main(App.scala:26)
 at twitter.streamingSpark.main(App.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 the relavent code is

  def twitterConnector() :Unit =
   {
  val atwitter=managingCredentials()

val
 ssc=StreamingContext.getOrCreate(hdfs://
 192.168.23.109:9000/home/cloud9/twitterCheckpointDir,()=
 { managingContext() })
fetchTweets(ssc, atwitter )

ssc.start() // Start the computation
ssc.awaitTermination()

   }

 def managingContext():StreamingContext =
   {
//making spark context
val conf = new
 SparkConf().setMaster(local[*]).setAppName(twitterConnector)
val ssc = new StreamingContext(conf, Seconds(1))
val sqlContext = new
 org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext.implicits._

//checkpointing

 ssc.checkpoint(hdfs://
 192.168.23.109:9000/home/cloud9/twitterCheckpointDir)
ssc
   }
def fetchTweets (ssc : StreamingContext , atwitter :
 Option[twitter4j.auth.Authorization]) : Unit = {


val tweets
 =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
val twt = tweets.window(Seconds(10),Seconds(10))
   

Re:

2015-07-07 Thread Gerard Maas
Anand,

AFAIK, you will need to change two settings:

spark.streaming.unpersist = false // in order for SStreaming to not drop
the raw RDD data
spark.cleaner.ttl = some reasonable value in seconds

Also be aware that the lineage of your union RDD will grow with each batch
interval. You will need to break lineage often with cache(), and rely on
the ttl for clean up.
You will probably be in some tricky ground with this approach.

A more reliable way would be to do dstream.window(...) for the length of
time you want to keep the data and then union that data with your RDD for
further processing using transform.
Something like:
dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union
otherRdd)...

If you need an unbound amount of dstream batch intervals, considering
writing the data to secondary storage instead.

-kr, Gerard.



On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote:

 Hi,

 Suppose I have an RDD that is loaded from some file and then I also have a
 DStream that has data coming from some stream. I want to keep union some of
 the tuples from the DStream into my RDD. For this I can use something like
 this:

   var myRDD: RDD[(String, Long)] = sc.fromText...
   dstream.foreachRDD{ rdd =
 myRDD = myRDD.union(rdd.filter(myfilter))
   }

 My questions is that for how long spark will keep RDDs underlying the
 dstream around? Is there some configuratoin knob that can control that?

 Regards,
 Anand



Re:

2015-07-07 Thread Gerard Maas
Evo,

I'd let the OP clarify the question. I'm not in position of clarifying his
requirements beyond what's written on the question.

Regarding window vs mutable union: window is a well-supported feature that
accumulates messages over time. The mutable unioning of RDDs is bound to
operational trouble as there're no warranties tied to data preservation and
it's unclear how one can produce 'cuts' of that union ready to be served
for some process/computation.  Intuitively, it will 'explode' at some point.

-kr, Gerard.



On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 spark.streaming.unpersist = false // in order for SStreaming to not drop
 the raw RDD data

 spark.cleaner.ttl = some reasonable value in seconds



 why is the above suggested provided the persist/vache operation on the
 constantly unioniuzed Batch RDD will have to be invoked anyway (after every
 union with DStream RDD), besides it will result in DStraeam RDDs
 accumulating in RAM unncesesarily for the duration of TTL



 re



 “A more reliable way would be to do dstream.window(...) for the length of
 time you want to keep the data and then union that data with your RDD for
 further processing using transform.”



 I think the actual requirement here is picking up and adding Specific
 Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving”
 messages from specific  sliding window and adding them to the Batch RDD



 This should be defined as the Frequency of Updates to the Batch RDD and
 then using dstream.window() equal to that frequency



 Can you also elaborate why you consider the dstream.window  approach more
 “reliable”



 *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
 *Sent:* Tuesday, July 7, 2015 12:56 PM
 *To:* Anand Nalya
 *Cc:* spark users
 *Subject:* Re:



 Anand,



 AFAIK, you will need to change two settings:



 spark.streaming.unpersist = false // in order for SStreaming to not drop
 the raw RDD data

 spark.cleaner.ttl = some reasonable value in seconds



 Also be aware that the lineage of your union RDD will grow with each batch
 interval. You will need to break lineage often with cache(), and rely on
 the ttl for clean up.

 You will probably be in some tricky ground with this approach.



 A more reliable way would be to do dstream.window(...) for the length of
 time you want to keep the data and then union that data with your RDD for
 further processing using transform.

 Something like:

 dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union
 otherRdd)...



 If you need an unbound amount of dstream batch intervals, considering
 writing the data to secondary storage instead.



 -kr, Gerard.







 On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote:

 Hi,



 Suppose I have an RDD that is loaded from some file and then I also have a
 DStream that has data coming from some stream. I want to keep union some of
 the tuples from the DStream into my RDD. For this I can use something like
 this:



   var myRDD: RDD[(String, Long)] = sc.fromText...

   dstream.foreachRDD{ rdd =

 myRDD = myRDD.union(rdd.filter(myfilter))

   }



 My questions is that for how long spark will keep RDDs underlying the
 dstream around? Is there some configuratoin knob that can control that?



 Regards,

 Anand





Re: Time is ugly in Spark Streaming....

2015-06-26 Thread Gerard Maas
Are you sharing the SimpleDateFormat instance? This looks a lot more like
the non-thread-safe behaviour of SimpleDateFormat (that has claimed many
unsuspecting victims over the years), than any 'ugly' Spark Streaming. Try
writing the timestamps in millis to Kafka and compare.

-kr, Gerard.

On Fri, Jun 26, 2015 at 11:06 AM, Sea 261810...@qq.com wrote:

 Hi, all

 I find a problem in spark streaming, when I use the time in function 
 foreachRDD...
 I find the time is very interesting.

 val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topicsSet)

 dataStream.map(x = createGroup(x._2, 
 dimensions)).groupByKey().foreachRDD((rdd, time) = {
   try {
 if (!rdd.partitions.isEmpty) {
   rdd.foreachPartition(partition = {
 handlePartition(partition, timeType, time, dimensions, outputTopic, 
 brokerList)
   })
 }
   } catch {
 case e: Exception = e.printStackTrace()
   }
 })


 val dateFormat = new SimpleDateFormat(-MM-dd'T'HH:mm:ss)

 var date = dateFormat.format(new Date(time.milliseconds))


 Then I insert the 'date' into Kafka , but I found .


 {timestamp:2015-06-00T16:50:02,status:3,type:1,waittime:0,count:17}

 {timestamp:2015-06-26T16:51:13,status:1,type:1,waittime:0,count:34}

 {timestamp:2015-06-00T16:50:02,status:4,type:0,waittime:0,count:279}

 {timestamp:2015-06-26T16:52:00,status:11,type:1,waittime:0,count:9}
 {timestamp:0020-06-26T16:50:36
 ,status:7,type:0,waittime:0,count:1722}

 {timestamp:2015-06-10T16:51:17,status:0,type:0,waittime:0,count:2958}

 {timestamp:2015-06-26T16:52:00,status:0,type:1,waittime:0,count:114}

 {timestamp:2015-06-10T16:51:17,status:11,type:0,waittime:0,count:2066}

 {timestamp:2015-06-26T16:52:00,status:1,type:0,waittime:0,count:1539}




Re: Spark Streaming reads from stdin or output from command line utility

2015-06-12 Thread Gerard Maas
Would using the socketTextStream and `yourApp | nc -lk port` work?? Not
sure how resilient the socket receiver is though. I've been playing with it
for a little demo and I don't understand yet its reconnection behavior.

Although I would think that putting some elastic buffer in between would be
a good idea to decouple producer from consumer. Kafka would be my first
choice.

-kr, Gerard.

On Fri, Jun 12, 2015 at 8:46 AM, Heath Guo heath...@fb.com wrote:

  Yes, it is lots of data, and the utility I'm working with prints out
 infinite real time data stream. Thanks.


   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 11:43 PM

 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command line
 utility

   Is it a lot of data that is expected to come through stdin? I mean is
 it even worth parallelizing the computation using something like Spark
 Streaming?

 On Thu, Jun 11, 2015 at 9:56 PM, Heath Guo heath...@fb.com wrote:

   Thanks for your reply! In my use case, it would be stream from only
 one stdin. Also I'm working with Scala.
 It would be great if you could talk about multi stdin case as well!
 Thanks.

   From: Tathagata Das t...@databricks.com
 Date: Thursday, June 11, 2015 at 8:11 PM
 To: Heath Guo heath...@fb.com
 Cc: user user@spark.apache.org
 Subject: Re: Spark Streaming reads from stdin or output from command
 line utility

Are you going to receive data from one stdin from one machine, or
 many stdins on many machines?


 On Thu, Jun 11, 2015 at 7:25 PM, foobar heath...@fb.com wrote:

 Hi, I'm new to Spark Streaming, and I want to create a application where
 Spark Streaming could create DStream from stdin. Basically I have a
 command
 line utility that generates stream data, and I'd like to pipe data into
 DStream. What's the best way to do that? I thought rdd.pipe() could help,
 but it seems that requires an rdd in the first place, which does not
 apply.
 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.html
 https://urldefense.proofpoint.com/v1/url?u=http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reads-from-stdin-or-output-from-command-line-utility-tp23289.htmlk=ZVNjlDMF0FElm4dQtryO4A%3D%3D%0Ar=4Z2U8tLm1orBgymimfryIw%3D%3D%0Am=4O1SseOzl0OsOY1s4%2B3jfsvy21wseYOJS0gxhf1IYc8%3D%0As=3df5e3f1e40970c1cb5191b7e3d6c9957c86993d2ac1f2d7fb6b622c7ebeccdd
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Cassandra Submit

2015-06-08 Thread Gerard Maas
? = ip address of your cassandra host

On Mon, Jun 8, 2015 at 10:12 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi ,

 How can I find spark.cassandra.connection.host? And what should I change ?
 Should I change cassandra.yaml ?

 Error says me *Exception in thread main java.io.IOException: Failed to
 open native connection to Cassandra at {127.0.1.1}:9042*

 What should I add *SparkConf sparkConf = new
 SparkConf().setAppName(JavaApiDemo).set(**spark.driver.allowMultipleContexts,
 true).set(spark.cassandra.connection.host, ?);*

 Best
 yasemin

 2015-06-06 3:04 GMT+03:00 Mohammed Guller moham...@glassbeam.com:

  Check your spark.cassandra.connection.host setting. It should be
 pointing to one of your Cassandra nodes.



 Mohammed



 *From:* Yasemin Kaya [mailto:godo...@gmail.com]
 *Sent:* Friday, June 5, 2015 7:31 AM
 *To:* user@spark.apache.org
 *Subject:* Cassandra Submit



 Hi,



 I am using cassandraDB in my project. I had that error *Exception in
 thread main java.io.IOException: Failed to open native connection to
 Cassandra at {127.0.1.1}:9042*



 I think I have to modify the submit line. What should I add or remove
 when I submit my project?



 Best,

 yasemin





 --

 hiç ender hiç




 --
 hiç ender hiç



Re: [Streaming] Configure executor logging on Mesos

2015-05-29 Thread Gerard Maas
Hi Tim,

Thanks for the info.   We (Andy Petrella and myself) have been diving a bit
deeper into this log config:

The log line I was referring to is this one (sorry, I provided the others
just for context)

*Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties*

That line comes from Logging.scala [1] where a default config is loaded is
none is found in the classpath upon the startup of the Spark Mesos executor
in the Mesos sandbox. At that point in time, none of the
application-specific resources have been shipped yet as the executor JVM is
just starting up.   To load a custom configuration file we should have it
already on the sandbox before the executor JVM starts and add it to the
classpath on the startup command. Is that correct?

For the classpath customization, It looks like it should be possible to
pass a -Dlog4j.configuration  property by using the
'spark.executor.extraClassPath' that will be picked up at [2] and that
should be added to the command that starts the executor JVM, but the
resource must be already on the host before we can do that. Therefore we
also need some means of 'shipping' the log4j.configuration file to the
allocated executor.

This all boils down to your statement on the need of shipping extra files
to the sandbox. Bottom line: It's currently not possible to specify a
config file for your mesos executor. (ours grows several GB/day).

The only workaround I found so far is to open up the Spark assembly,
replace the log4j-default.properties and pack it up again.  That would
work, although kind of rudimentary as we use the same assembly for many
jobs.  Probably, accessing the log4j API programmatically should also work
(I didn't try that yet)

Should we open a JIRA for this functionality?

-kr, Gerard.




[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Logging.scala#L128
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala#L77

On Thu, May 28, 2015 at 7:50 PM, Tim Chen t...@mesosphere.io wrote:


 -- Forwarded message --
 From: Tim Chen t...@mesosphere.io
 Date: Thu, May 28, 2015 at 10:49 AM
 Subject: Re: [Streaming] Configure executor logging on Mesos
 To: Gerard Maas gerard.m...@gmail.com


 Hi Gerard,

 The log line you referred to is not Spark logging but Mesos own logging,
 which is using glog.

 Our own executor logs should only contain very few lines though.

 Most of the log lines you'll see is from Spark, and it can be controled by
 specifiying a log4j.properties to be downloaded with your Mesos task.
 Alternatively if you are downloading Spark executor via spark.executor.uri,
 you can include log4j.properties in that tar ball.

 I think we probably need some more configurations for Spark scheduler to
 pick up extra files to be downloaded into the sandbox.

 Tim





 On Thu, May 28, 2015 at 6:46 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 I'm trying to control the verbosity of the logs on the Mesos executors
 with no luck so far. The default behaviour is INFO on stderr dump with an
 unbounded growth that gets too big at some point.

 I noticed that when the executor is instantiated, it locates a default
 log configuration in the spark assembly:

 I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
 20150528-063307-780930314-5050-8152-S5
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties

 So, no matter what I provide in my job jar files (or also tried with
 (spark.executor.extraClassPath=log4j.properties) takes effect in the
 executor's configuration.

 How should I configure the log on the executors?

 thanks, Gerard.






Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Gerard Maas
Hi,

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.


*Longer version.*

I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.

Then you have two things to consider: the Streaming receivers and the Spark
processing cluster.

Currently, the receiving topology is static. One receiver is allocated with
each DStream instantiated and it will use 1 core in the cluster. Once the
StreamingContext is started, this topology cannot be changed, therefore the
number of Kafka receivers is fixed for the lifetime of your DStream.
What we do is to calculate the cluster capacity and use that as a fixed
upper bound (with a margin) for the receiver throughput.

There's work in progress to add a reactive model to the receiver, where
backpressure can be applied to handle overload conditions. See
https://issues.apache.org/jira/browse/SPARK-7398

Once the data is received, it will be processed in a 'classical' Spark
pipeline, so previous posts on spark resource scheduling might apply.

Regarding metrics, the standard metrics subsystem of spark will report
streaming job performance. Check the driver's metrics endpoint to peruse
the available metrics:

driver:ui-port/metrics/json

-kr, Gerard.


(*) Spark is a project that moves so fast that statements might be
invalidated by new work every minute.

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 I'm trying to understand if there are design patterns for autoscaling Spark
 (add/remove slave machines to the cluster) based on the throughput.

 Assuming we can throttle Spark consumers, the respective Kafka topics we
 stream data from would start growing.  What are some of the ways to
 generate
 the metrics on the number of new messages and the rate they are piling up?
 This perhaps is more of a Kafka question; I see a pretty sparse javadoc
 with
 the Metric interface and not much else...

 What are some of the ways to expand/contract the Spark cluster? Someone has
 mentioned Mesos...

 I see some info on Spark metrics in  the Spark monitoring guide
 https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
 perhaps implement a custom sink that would help us autoscale up or down
 based on the throughput?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[Streaming] Configure executor logging on Mesos

2015-05-28 Thread Gerard Maas
Hi,

I'm trying to control the verbosity of the logs on the Mesos executors with
no luck so far. The default behaviour is INFO on stderr dump with an
unbounded growth that gets too big at some point.

I noticed that when the executor is instantiated, it locates a default log
configuration in the spark assembly:

I0528 13:36:22.958067 26890 exec.cpp:206] Executor registered on slave
20150528-063307-780930314-5050-8152-S5
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

So, no matter what I provide in my job jar files (or also tried with
(spark.executor.extraClassPath=log4j.properties) takes effect in the
executor's configuration.

How should I configure the log on the executors?

thanks, Gerard.


Re: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Gerard Maas
I'm familiar with the TableWriter code and that log only appears if the
write actually succeeded. (See
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala
)

Thinking infrastructure, we see that it's always trying to reach
'localhost'. Are you running 1 node test in local mode?  Otherwise, there's
something wrong with the way you're configuring Cassandra or the connection
to it  (always tempted to say her :-)  ).

-kr, Gerard.

On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 I think the message that it has written 2 rows is misleading



 If you look further down you will see that it could not initialize a
 connection pool for Casandra (presumably while trying to write the
 previously mentioned 2 rows)



 Another confirmation of this hypothesis is the phrase “error during
 Transport Initialization” – so all these stuff points out in the direction
 of Infrastructure or Configuration issues – check you Casandra service and
 how you connect to it etc mate



 *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
 *Sent:* Sunday, May 10, 2015 11:33 AM
 *To:* Sergio Jiménez Barrio; spark users
 *Subject:* Re: Spark streaming closes with Cassandra Conector



 It successfully writes some data and fails afterwards, like the host or
 connection goes down. Weird.



 Maybe you should post this question on the Spark-Cassandra connector group:


 https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user





 -kr, Gerard.





 On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio 
 drarse.a...@gmail.com wrote:

 This is:


 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in
 0,016 s.
 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host
 127.0.0.1 (datacenter1)
 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
 com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042]
 Unexpected error during transport initialization
 (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error
 writing: Closed channel)
 at
 com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
 at com.datastax.driver.core.Connection.init(Connection.java:116)
 at
 com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32)
 at
 com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
 at
 com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74)
 at
 com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
 at
 com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
 at
 com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
 at
 com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
 at
 com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
 at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
 at
 com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
 at
 com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
 at
 com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042]
 Error writing: Closed channel
 at
 com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
 at
 org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
 at
 org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
 at
 org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296

Re: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Gerard Maas
It successfully writes some data and fails afterwards, like the host or
connection goes down. Weird.

Maybe you should post this question on the Spark-Cassandra connector group:
https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user


-kr, Gerard.


On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio 
drarse.a...@gmail.com wrote:

 This is:

 15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in
 0,016 s.
 15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host
 127.0.0.1 (datacenter1)
 15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
 com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042]
 Unexpected error during transport initialization
 (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error
 writing: Closed channel)
 at
 com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
 at com.datastax.driver.core.Connection.init(Connection.java:116)
 at
 com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32)
 at
 com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
 at
 com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74)
 at
 com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
 at
 com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
 at
 com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
 at
 com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
 at
 com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
 at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
 at
 com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
 at
 com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
 at
 com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042]
 Error writing: Closed channel
 at
 com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
 at
 org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
 at
 org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
 at
 org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
 at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
 at
 org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
 at
 org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
 ... 3 more
 15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot
 connect to any host, scheduling retry in 1000 milliseconds

 Thanks!

 2015-05-10 0:58 GMT+02:00 Gerard Maas gerard.m...@gmail.com:

 Hola Sergio,

 It would help if you added the error message + stack trace.

 -kr, Gerard.

 On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio 
 drarse.a...@gmail.com wrote:

 I am trying save some data in Cassandra in app with spark Streaming:

 Messages.foreachRDD {
  . . .
 CassandraRDD.saveToCassandra(test,test)
 }

 When I run, the app is closes when I recibe data or can't connect with
 Cassandra.

 Some idea? Thanks


 --
 Atte. Sergio Jiménez






Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

I just found weird that one would use parallel threads to 'filter', as
filter is lazy in Spark, and multithreading wouldn't have any effect unless
the action triggering the execution of the lineage containing such filter
is executed on a separate thread. One must have very specific
reasons/requirements to do that, beyond 'not traversing the data twice'.
The request for the code was only to help checking that.

-kr, Gerard.

On Thu, May 7, 2015 at 7:26 PM, Bill Q bill.q@gmail.com wrote:

 The multi-threading code in Scala is quite simple and you can google it
 pretty easily. We used the Future framework. You can use Akka also.

 @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
 before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?



 On Thursday, May 7, 2015, Evo Eftimov evo.efti...@isecc.com wrote:

 Scala is a language, Spark is an OO/Functional, Distributed Framework
 facilitating Parallel Programming in a distributed environment



 Any “Scala parallelism” occurs within the Parallel Model imposed by the
 Spark OO Framework – ie it is limited in terms of what it can achieve in
 terms of influencing the Spark Framework behavior – that is the nature of
 programming with/for frameworks



 When RDD1 and RDD2 are partitioned and different Actions applied to them
 this will result in Parallel Pipelines / DAGs within the Spark Framework

 RDD1 = RDD.filter()

 RDD2 = RDD.filter()





 *From:* Bill Q [mailto:bill.q@gmail.com]
 *Sent:* Thursday, May 7, 2015 4:55 PM
 *To:* Evo Eftimov
 *Cc:* user@spark.apache.org
 *Subject:* Re: Map one RDD into two RDD



 Thanks for the replies. We decided to use concurrency in Scala to do the
 two mappings using the same source RDD in parallel. So far, it seems to be
 working. Any comments?

 On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote:

 RDD1 = RDD.filter()

 RDD2 = RDD.filter()



 *From:* Bill Q [mailto:bill.q@gmail.com]
 *Sent:* Tuesday, May 5, 2015 10:42 PM
 *To:* user@spark.apache.org
 *Subject:* Map one RDD into two RDD



 Hi all,

 I have a large RDD that I map a function to it. Based on the nature of
 each record in the input RDD, I will generate two types of data. I would
 like to save each type into its own RDD. But I can't seem to find an
 efficient way to do it. Any suggestions?



 Many thanks.





 Bill



 --

 Many thanks.

 Bill





 --

 Many thanks.

 Bill





 --
 Many thanks.


 Bill




Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

Could you show a snippet of code to illustrate your choice?

-Gerard.

On Thu, May 7, 2015 at 5:55 PM, Bill Q bill.q@gmail.com wrote:

 Thanks for the replies. We decided to use concurrency in Scala to do the
 two mappings using the same source RDD in parallel. So far, it seems to be
 working. Any comments?


 On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote:

 RDD1 = RDD.filter()

 RDD2 = RDD.filter()



 *From:* Bill Q [mailto:bill.q@gmail.com]
 *Sent:* Tuesday, May 5, 2015 10:42 PM
 *To:* user@spark.apache.org
 *Subject:* Map one RDD into two RDD



 Hi all,

 I have a large RDD that I map a function to it. Based on the nature of
 each record in the input RDD, I will generate two types of data. I would
 like to save each type into its own RDD. But I can't seem to find an
 efficient way to do it. Any suggestions?



 Many thanks.





 Bill



 --

 Many thanks.

 Bill





 --
 Many thanks.


 Bill




Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-04 Thread Gerard Maas
I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
is a singleton, I guess that what's going on when running on a cluster is
that the call to:

 SolrIndexerDriver.solrInputDocumentList.add(elem)

is happening on different singleton instances of the  SolrIndexerDriver on
different JVMs while

SolrIndexerDriver.solrServer.commit

is happening on the driver.

In practical terms, the lists on the executors are being filled-in but they
are never committed and on the driver the opposite is happening.

-kr, Gerard

On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I'm trying to deal with some code that runs differently on Spark
 stand-alone mode and Spark running on a cluster. Basically, for each item
 in an RDD, I'm trying to add it to a list, and once this is done, I want to
 send this list to Solr.

 This works perfectly fine when I run the following code in stand-alone
 mode of Spark, but does not work when the same code is run on a cluster.
 When I run the same code on a cluster, it is like send to Solr part of
 the code is executed before the list to be sent to Solr is filled with
 items. I try to force the execution by solrInputDocumentJavaRDD.collect();
 after foreach, but it seems like it does not have any effect.

 // For each RDD
 solrInputDocumentJavaDStream.foreachRDD(
 new FunctionJavaRDDSolrInputDocument, Void() {
   @Override
   public Void call(JavaRDDSolrInputDocument
 solrInputDocumentJavaRDD) throws Exception {

 // For each item in a single RDD
 solrInputDocumentJavaRDD.foreach(
 new VoidFunctionSolrInputDocument() {
   @Override
   public void call(SolrInputDocument
 solrInputDocument) {

 // Add the solrInputDocument to the list of
 SolrInputDocuments

 SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
   }
 });

 // Try to force execution
 solrInputDocumentJavaRDD.collect();


 // After having finished adding every SolrInputDocument to the
 list
 // add it to the solrServer, and commit, waiting for the
 commit to be flushed
 try {

   // Seems like when run in cluster mode, the list size is
 zero,
  // therefore the following part is never executed

   if (SolrIndexerDriver.solrInputDocumentList != null
SolrIndexerDriver.solrInputDocumentList.size() 
 0) {

 SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
 SolrIndexerDriver.solrServer.commit(true, true);
 SolrIndexerDriver.solrInputDocumentList.clear();
   }
 } catch (SolrServerException | IOException e) {
   e.printStackTrace();
 }


 return null;
   }
 }
 );


 What should I do, so that sending-to-Solr part executes after the list of
 SolrDocuments are added to solrInputDocumentList (and works also in cluster
 mode)?


 --
 Emre Sevinç



Re: How to do dispatching in Streaming?

2015-04-16 Thread Gerard Maas
From experience, I'd recommend using the  dstream.foreachRDD method and
doing the filtering within that context. Extending the example of TD,
something like this:

dstream.foreachRDD { rdd =
   rdd.cache()
   messageType.foreach (msgTyp =
   val selection = rdd.filter(msgTyp.match(_))
selection.foreach { ... }
}
   rdd.unpersist()
}

I would discourage the use of:

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

Because it will be a lot more work to process on the spark side.
Each DSteam will schedule tasks for each partition, resulting in #dstream x
#partitions x #stages tasks instead of the #partitions x #stages with the
approach presented above.


-kr, Gerard.

On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 And yet another way is to demultiplex at one point which will yield
 separate DStreams for each message type which you can then process in
 independent DAG pipelines in the following way:



 MessageType1DStream = MainDStream.filter(message type1)

 MessageType2DStream = MainDStream.filter(message type2)

 MessageType3DStream = MainDStream.filter(message type3)



 Then proceed your processing independently with MessageType1DStream,
 MessageType2DStream and MessageType3DStream ie each of them is a starting
 point of a new DAG pipeline running in parallel



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Thursday, April 16, 2015 12:52 AM
 *To:* Jianshi Huang
 *Cc:* user; Shao, Saisai; Huang Jie
 *Subject:* Re: How to do dispatching in Streaming?



 It may be worthwhile to do architect the computation in a different way.



 dstream.foreachRDD { rdd =

rdd.foreach { record =

   // do different things for each record based on filters

}

 }



 TD



 On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,



 I have a Kafka topic that contains dozens of different types of messages.
 And for each one I'll need to create a DStream for it.



 Currently I have to filter the Kafka stream over and over, which is very
 inefficient.



 So what's the best way to do dispatching in Spark Streaming? (one DStream
 - multiple DStreams)




 Thanks,

 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





Re: Writing Spark Streaming Programs

2015-03-19 Thread Gerard Maas
Try writing this Spark Streaming idiom in Java and you'll choose Scala soon
enough:

dstream.foreachRDD{rdd =
 rdd.foreachPartition( partition = )
}

When deciding between Java and Scala for Spark, IMHO Scala has the
upperhand. If you're concerned with readability, have a look at the Scala
coding style recently open sourced by DataBricks:
https://github.com/databricks/scala-style-guide  (btw, I don't agree a good
part of it, but recognize that it can keep the most complex Scala
constructions out of your code)



On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote:

 Hello All,

 I'm using Spark for streaming but I'm unclear one which implementation
 language to use Java, Scala or Python.

 I don't know anything about Python, familiar with Scala and have been
 doing Java for a long time.

 I think the above shouldn't influence my decision on which language to use
 because I believe the tool should, fit the problem.

 In terms of performance Java and Scala are comparable. However Java is OO
 and Scala is FP, no idea what Python is.

 If using Scala and not applying a consistent style of programming Scala
 code can become unreadable, but I do like the fact it seems to be possible
 to do so much work with so much less code, that's a strong selling point
 for me. Also it could be that the type of programming done in Spark is best
 implemented in Scala as FP language, not sure though.

 The question I would like your good help with is are there any other
 considerations I need to think about when deciding this? are there any
 recommendations you can make in regards to this?

 Regards
 jk









Re: Partitioning

2015-03-13 Thread Gerard Maas
In spark-streaming, the consumers will fetch data and put it into 'blocks'.
Each block becomes a partition of the rdd generated during that batch
interval.
The size of each is block controlled by the conf:
'spark.streaming.blockInterval'. That is, the amount of data the consumer
can collect in that time.

The number of  RDD partitions in a streaming interval will be then: batch
interval/ spark.streaming.blockInterval * # of consumers.

-kr, Gerard
On Mar 13, 2015 11:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote:

 I still don't follow how spark is partitioning data in multi node
 environment. Is there a document on how spark does portioning of data. For
 eg: in word count eg how is spark distributing words to multiple nodes?

 On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das t...@databricks.com
 wrote:

 If you want to access the keys in an RDD that is partition by key, then
 you can use RDD.mapPartition(), which gives you access to the whole
 partition as an iteratorkey, value. You have the option of maintaing the
 partitioning information or not by setting the preservePartitioning flag in
 mapPartition (see docs). But use it at your own risk. If you modify the
 keys, and yet preserve partitioning, the partitioning would not make sense
 any more as the hash of the keys have changed.

 TD



 On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 I am trying to look for a documentation on partitioning, which I can't
 seem to find. I am looking at spark streaming and was wondering how does it
 partition RDD in a multi node environment. Where are the keys defined that
 is used for partitioning? For instance in below example keys seem to be
 implicit:

 Which one is key and which one is value? Or is it called a flatMap
 because there are no keys?

 // Split each line into words
 JavaDStreamString words = lines.flatMap(
   new FlatMapFunctionString, String() {
 @Override public IterableString call(String x) {
   return Arrays.asList(x.split( ));
 }
   });


 And are Keys available inside of Function2 in case it's required for a
 given use case ?


 JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey(
   new Function2Integer, Integer, Integer() {
 @Override public Integer call(Integer i1, Integer i2) throws
 Exception {
   return i1 + i2;
 }
   });









Re: Unable to saveToCassandra while cassandraTable works fine

2015-03-12 Thread Gerard Maas
This: java.lang.NoSuchMethodError  almost always indicates a version
conflict somewhere.

It looks like you are using Spark 1.1.1 with the cassandra-spark connector
1.2.0. Try aligning those. Those metrics were introduced recently in the
1.2.0 branch of the cassandra connector.
Either upgrade your spark to 1.2.0 or downgrade the connector to something
compatible with Spark 1.1.1

-kr, Gerard

On Wed, Mar 11, 2015 at 1:42 PM, Tiwari, Tarun tarun.tiw...@kronos.com
wrote:

  Hi,



 I am stuck at this for 3 days now. I am using the
 spark-cassandra-connector with spark and I am able to make RDDs with
 sc.cassandraTable function that means spark is able to communicate with
 Cassandra properly.



 But somehow the saveToCassandra is not working. Below are the steps I am
 doing.

 Does it have something to do with my spark-env or spark-defaults? Am I
 missing something critical ?



 scala import com.datastax.spark.connector._

 scala
 sc.addJar(/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar)

 scala val myTable = sc.cassandraTable(test2,  words)

 scala myTable.collect()

 *--- this works perfectly fine.*



 scala val data = sc.parallelize(Seq((81, XXX), (82, )))

 scala data.saveToCassandra(test2, words, SomeColumns(word, count))

 *--- this fails*



 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042
 added

 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
 10.131.141.192 (datacenter1)

 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042
 added

 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host
 10.131.141.193 (datacenter1)

 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042
 added

 15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster:
 wfan_cluster_DB

 15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at
 RDDFunctions.scala:29

 15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at
 RDDFunctions.scala:29) with 2 output partitions (allowLocal=false)

 15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
 RDDFunctions.scala:29)

 15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List()

 15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List()

 15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1
 (ParallelCollectionRDD[1] at parallelize at console:20), which has no
 missing parents

 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with
 curMem=1792, maxMem=2778778828

 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in
 memory (estimated size 7.2 KB, free 2.6 GB)

 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with
 curMem=9192, maxMem=2778778828

 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as
 bytes in memory (estimated size 3.5 KB, free 2.6 GB)

 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
 memory on 10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB)

 15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block
 broadcast_1_piece0

 15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage
 1 (ParallelCollectionRDD[1] at parallelize at console:20)

 15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

 15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID
 2, 10.131.141.192, PROCESS_LOCAL, 1216 bytes)

 15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID
 3, 10.131.141.193, PROCESS_LOCAL, 1217 bytes)

 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
 memory on 10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB)

 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in
 memory on 10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB)

 15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra
 cluster: wfan_cluster_DB

 15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2,
 10.131.141.192): java.lang.NoSuchMethodError:
 org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;


 com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)


 com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)


 com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)


 com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

 org.apache.spark.scheduler.Task.run(Task.scala:54)


 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 

Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Gerard Maas
+1 for TypeSafe config
Our practice is to include all spark properties under a 'spark' entry in
the config file alongside job-specific configuration:

A config file would look like:
spark {
 master = 
 cleaner.ttl = 123456
 ...
}
job {
context {
src = foo
action = barAction
}
prop1 = val1
}

Then, to create our Spark context, we transparently pass the spark section
to a SparkConf instance.
This idiom will instantiate the context with the spark specific
configuration:

sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

And we can make use of the config object everywhere else.

We use the override model of the typesafe config: reasonable defaults go in
the reference.conf (within the jar). Environment-specific overrides go in
the application.conf (alongside the job jar) and hacks are passed with
-Dprop=value :-)


-kr, Gerard.


On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of propertiesFile
 by using standard System.getProperty method. (I can use new
 SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
 and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
 wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I
 have non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir =
 sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç




 --
 Emre Sevinc



Re: Writing RDD to a csv file

2015-02-03 Thread Gerard Maas
this is more of a scala question, so probably next time you'd like to
address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

val optArrStr:Option[Array[String]] = ???
optArrStr.map(arr = arr.mkString(,)).getOrElse()  // empty string or
whatever default value you have for this.

kr, Gerard.

On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote:

 I have a RDD which is of type

 org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))]

 I want to write it as a csv file.

 Please suggest how this can be done.

 myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
 line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

 Doing mkString on line._2._1 works but does not work for the Option type.

 Please suggest how this can be done.


 Thanks
 Kundan





Re: Spark (Streaming?) holding on to Mesos resources

2015-01-29 Thread Gerard Maas
Thanks a lot.

After reading Mesos-1688, I still don't understand how/why a job will hoard
and hold on to so many resources even in the presence of that bug.
Looking at the release notes, I think this ticket could be relevant to
preventing the behavior we're seeing:
[MESOS-186] - Resource offers should be rescinded after some configurable
timeout

Bottom line, we're following your advice and we're testing Mesos 0.21 on
dev to roll out to our prod platforms later on.

Thanks!!

-kr, Gerard.


On Tue, Jan 27, 2015 at 9:15 PM, Tim Chen t...@mesosphere.io wrote:

 Hi Gerard,

 As others has mentioned I believe you're hitting Mesos-1688, can you
 upgrade to the latest Mesos release (0.21.1) and let us know if it resolves
 your problem?

 Thanks,

 Tim

 On Tue, Jan 27, 2015 at 10:39 AM, Sam Bessalah samkiller@gmail.com
 wrote:

 Hi Geraard,
 isn't this the same issueas this?
 https://issues.apache.org/jira/browse/MESOS-1688

 On Mon, Jan 26, 2015 at 9:17 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 We are observing with certain regularity that our Spark  jobs, as Mesos
 framework, are hoarding resources and not releasing them, resulting in
 resource starvation to all jobs running on the Mesos cluster.

 For example:
 This is a job that has spark.cores.max = 4 and spark.executor.memory=3g

 IDFrameworkHostCPUsMem…5050-16506-1146497FooStreaming
 dnode-4.hdfs.private713.4 GB…5050-16506-1146495FooStreaming
 dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming
 dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming
 dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming
 dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming
 dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming
 dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming
 dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming
 dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming
 dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming
 dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming
 dnode-0.hdfs.private6.46.8 GB
 The only way to release the resources is by manually finding the process
 in the cluster and killing it. The jobs are often streaming but also batch
 jobs show this behavior. We have more streaming jobs than batch, so stats
 are biased.
 Any ideas of what's up here? Hopefully some very bad ugly bug that has
 been fixed already and that will urge us to upgrade our infra?

 Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0

 -kr, Gerard.






Re: how to split key from RDD for compute UV

2015-01-27 Thread Gerard Maas
Hi,

Did you try asking this on StackOverflow?
http://stackoverflow.com/questions/tagged/apache-spark

I'd also suggest adding some sample data to help others understanding your
logic.

-kr, Gerard.



On Tue, Jan 27, 2015 at 1:14 PM, 老赵 laozh...@sina.cn wrote:

 Hello All,

 I am writing a simple Spark application  to count  UV(unique view) from a
 log file。

 Below is my code,it is not right on the red line .

 My idea  here is same cookie on a host  only count one .So i want to split
 the host from the previous RDD. But now I don't know how to finish it .

 Any suggestion will be appreciate!



 val url_index = args(1).toInt

 val cookie_index = args(2).toInt

 val textRDD = sc.textFile(args(0))

 .map(_.split(\t))

 .map(line = ((new java.net.URL(line(url_index)).getHost) + \t +
 line(cookie_index),1))

 .reduceByKey(_ + _)

 .map(line = (line.split(\t)(0),1))

 .reduceByKey(_ + _)

 .map(item = item.swap)

 .sortByKey(false)

 .map(item = item.swap)

 




Spark (Streaming?) holding on to Mesos resources

2015-01-26 Thread Gerard Maas
Hi,

We are observing with certain regularity that our Spark  jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mesos cluster.

For example:
This is a job that has spark.cores.max = 4 and spark.executor.memory=3g

IDFrameworkHostCPUsMem…5050-16506-1146497FooStreamingdnode-4.hdfs.private713.4
GB…5050-16506-1146495FooStreaming
dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming
dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming
dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming
dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming
dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming
dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming
dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming
dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming
dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming
dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming
dnode-0.hdfs.private6.46.8 GB
The only way to release the resources is by manually finding the process in
the cluster and killing it. The jobs are often streaming but also batch
jobs show this behavior. We have more streaming jobs than batch, so stats
are biased.
Any ideas of what's up here? Hopefully some very bad ugly bug that has been
fixed already and that will urge us to upgrade our infra?

Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0

-kr, Gerard.


Spark (Streaming?) holding on to Mesos Resources

2015-01-26 Thread Gerard Maas
(looks like the list didn't like a HTML table on the previous email. My
excuses for any duplicates)

Hi,

We are observing with certain regularity that our Spark  jobs, as Mesos
framework, are hoarding resources and not releasing them, resulting in
resource starvation to all jobs running on the Mesos cluster.

For example:
This is a job that has spark.cores.max = 4 and spark.executor.memory=3g

| ID   |Framework  |Host|CPUs  |Mem
…5050-16506-1146497 FooStreaming dnode-4.hdfs.private 7 13.4 GB
…5050-16506-1146495 FooStreamingdnode-0.hdfs.private 1 6.4 GB
…5050-16506-1146491 FooStreamingdnode-5.hdfs.private 7 11.9 GB
…5050-16506-1146449 FooStreamingdnode-3.hdfs.private 7 4.9 GB
…5050-16506-1146247 FooStreamingdnode-1.hdfs.private 0.5 5.9 GB
…5050-16506-1146226 FooStreamingdnode-2.hdfs.private 3 7.9 GB
…5050-16506-1144069 FooStreamingdnode-3.hdfs.private 1 8.7 GB
…5050-16506-1133091 FooStreamingdnode-5.hdfs.private 1 1.7 GB
…5050-16506-1133090 FooStreamingdnode-2.hdfs.private 5 5.2 GB
…5050-16506-1133089 FooStreamingdnode-1.hdfs.private 6.5 6.3 GB
…5050-16506-1133088 FooStreamingdnode-4.hdfs.private 1 251 MB
…5050-16506-1133087 FooStreamingdnode-0.hdfs.private 6.4 6.8 GB

The only way to release the resources is by manually finding the process in
the cluster and killing it. The jobs are often streaming but also batch
jobs show this behavior. We have more streaming jobs than batch, so stats
are biased.
Any ideas of what's up here? Hopefully some very bad ugly bug that has been
fixed already and that will urge us to upgrade our infra?

Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0

-kr, Gerard.


Re: Spark (Streaming?) holding on to Mesos Resources

2015-01-26 Thread Gerard Maas
Hi Jörn,

A memory leak on the job would be contained within the resources reserved
for it, wouldn't it?
And the job holding resources is not always the same. Sometimes it's one of
the Streaming jobs, sometimes it's a heavy batch job that runs every hour.
Looks to me that whatever is causing the issue, it's participating in the
resource offer protocol of Mesos and my first suspect would be the Mesos
scheduler in Spark. (The table above is the tab Offers from the Mesos UI.

Are there any other factors involved in the offer acceptance/rejection
between Mesos and a scheduler?

What do you think?

-kr, Gerard.

On Mon, Jan 26, 2015 at 11:23 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hi,

 What do your jobs do?  Ideally post source code, but some description
 would already helpful to support you.

 Memory leaks can have several reasons - it may not be Spark at all.

 Thank you.

 Le 26 janv. 2015 22:28, Gerard Maas gerard.m...@gmail.com a écrit :

 
  (looks like the list didn't like a HTML table on the previous email. My
 excuses for any duplicates)
 
  Hi,
 
  We are observing with certain regularity that our Spark  jobs, as Mesos
 framework, are hoarding resources and not releasing them, resulting in
 resource starvation to all jobs running on the Mesos cluster.
 
  For example:
  This is a job that has spark.cores.max = 4 and spark.executor.memory=3g
 
  | ID   |Framework  |Host|CPUs  |Mem
  …5050-16506-1146497 FooStreaming dnode-4.hdfs.private 7 13.4 GB
  …5050-16506-1146495 FooStreamingdnode-0.hdfs.private 1 6.4 GB
  …5050-16506-1146491 FooStreamingdnode-5.hdfs.private 7 11.9 GB
  …5050-16506-1146449 FooStreamingdnode-3.hdfs.private 7 4.9 GB
  …5050-16506-1146247 FooStreamingdnode-1.hdfs.private 0.5 5.9 GB
  …5050-16506-1146226 FooStreamingdnode-2.hdfs.private 3 7.9 GB
  …5050-16506-1144069 FooStreamingdnode-3.hdfs.private 1 8.7 GB
  …5050-16506-1133091 FooStreamingdnode-5.hdfs.private 1 1.7 GB
  …5050-16506-1133090 FooStreamingdnode-2.hdfs.private 5 5.2 GB
  …5050-16506-1133089 FooStreamingdnode-1.hdfs.private 6.5 6.3 GB
  …5050-16506-1133088 FooStreamingdnode-4.hdfs.private 1 251 MB
  …5050-16506-1133087 FooStreamingdnode-0.hdfs.private 6.4 6.8 GB
 
  The only way to release the resources is by manually finding the process
 in the cluster and killing it. The jobs are often streaming but also batch
 jobs show this behavior. We have more streaming jobs than batch, so stats
 are biased.
  Any ideas of what's up here? Hopefully some very bad ugly bug that has
 been fixed already and that will urge us to upgrade our infra?
 
  Mesos 0.20 +  Marathon 0.7.4 + Spark 1.1.0
 
  -kr, Gerard.




Re: Discourse: A proposed alternative to the Spark User list

2015-01-23 Thread Gerard Maas
+1

On Fri, Jan 23, 2015 at 5:58 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 That sounds good to me. Shall I open a JIRA / PR about updating the site
 community page?
 On 2015년 1월 23일 (금) at 오전 4:37 Patrick Wendell patr...@databricks.com
 wrote:

 Hey Nick,

 So I think we what can do is encourage people to participate on the
 stack overflow topic, and this I think we can do on the Spark website
 as a first class community resource for Spark. We should probably be
 spending more time on that site given its popularity.

 In terms of encouraging this explicitly *to replace* the ASF mailing
 list, that I think is harder to do. The ASF makes a lot of effort to
 host its own infrastructure that is neutral and not associated with
 any corporation. And by and large the ASF policy is to consider that
 as the de-facto forum of communication for any project.

 Personally, I wish the ASF would update this policy - for instance, by
 allowing the use of third party lists or communication fora - provided
 that they allow exporting the conversation if those sites were to
 change course. However, the state of the art stands as such.

 - Patrick


 On Wed, Jan 21, 2015 at 8:43 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Josh / Patrick,
 
  What do y’all think of the idea of promoting Stack Overflow as a place
 to
  ask questions over this list, as long as the questions fit SO’s
 guidelines
  (how-to-ask, dont-ask)?
 
  The apache-spark tag is very active on there.
 
  Discussions of all types are still on-topic here, but when possible we
 want
  to encourage people to use SO.
 
  Nick
 
  On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
 wrote:
 
  Its a very valid  idea indeed, but... It's a tricky  subject since the
  entire ASF is run on mailing lists , hence there are so many different
 but
  equally sound ways of looking at this idea, which conflict with one
 another.
 
   On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
  
   I think this is a really great idea for really opening up the
   discussions
   that happen here. Also, it would be nice to know why there doesn't
 seem
   to
   be much interest. Maybe I'm misunderstanding some nuance of Apache
   projects.
  
   Cheers
  
  
  
   --
   View this message in context:
   http://apache-spark-user-list.1001560.n3.nabble.com/
 Discourse-A-proposed-alternative-to-the-Spark-User-
 list-tp20851p21288.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
and post the code (if possible).
In a nutshell, your processing time  batch interval,  resulting in an
ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic
is.

-kr, Gerard.

On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 This is not normal. Its a huge scheduling delay!! Can you tell me more
 about the application?
 - cluser setup, number of receivers, whats the computation, etc.

 On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:

 Hate to do this...but...erm...bump? Would really appreciate input from
 others using Streaming. Or at least some docs that would tell me if these
 are expected or not.

 --
 From: as...@live.com
 To: user@spark.apache.org
 Subject: Are these numbers abnormal for spark streaming?
 Date: Wed, 21 Jan 2015 11:26:31 +


 Hi Guys,
 I've got Spark Streaming set up for a low data rate system (using spark's
 features for analysis, rather than high throughput). Messages are coming in
 throughout the day, at around 1-20 per second (finger in the air
 estimate...not analysed yet).  In the spark streaming UI for the
 application, I'm getting the following after 17 hours.

 Streaming

- *Started at: *Tue Jan 20 16:58:43 GMT 2015
- *Time since start: *18 hours 24 minutes 34 seconds
- *Network receivers: *2
- *Batch interval: *2 seconds
- *Processed batches: *16482
- *Waiting batches: *1



 Statistics over last 100 processed batchesReceiver Statistics

- Receiver


- Status


- Location


- Records in last batch
- [2015/01/21 11:23:18]


- Minimum rate
- [records/sec]


- Median rate
- [records/sec]


- Maximum rate
- [records/sec]


- Last Error

 RmqReceiver-0ACTIVEF
 144727-RmqReceiver-1ACTIVEBR
 124726-
 Batch Processing Statistics

MetricLast batchMinimum25th percentileMedian75th 
 percentileMaximumProcessing
Time3 seconds 994 ms157 ms4 seconds 16 ms4 seconds 961 ms5 seconds 3
ms5 seconds 171 msScheduling Delay9 hours 15 minutes 4 seconds9 hours
10 minutes 54 seconds9 hours 11 minutes 56 seconds9 hours 12 minutes
57 seconds9 hours 14 minutes 5 seconds9 hours 15 minutes 4 secondsTotal
Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 seconds9 hours
12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 seconds9
hours 15 minutes 8 seconds


 Are these normal. I was wondering what the scheduling delay and total
 delay terms are, and if it's normal for them to be 9 hours.

 I've got a standalone spark master and 4 spark nodes. The streaming app
 has been given 4 cores, and it's using 1 core per worker node. The
 streaming app is submitted from a 5th machine, and that machine has nothing
 but the driver running. The worker nodes are running alongside Cassandra
 (and reading and writing to it).

 Any insights would be appreciated.

 Regards,
 Ashic.





Re: Discourse: A proposed alternative to the Spark User list

2015-01-22 Thread Gerard Maas
I've have been contributing to SO for a while now.  Here're few
observations I'd like to contribute to the discussion:

The level of questions on SO is often of more entry-level. Harder
questions (that require expertise in a certain area) remain unanswered for
a while. Same questions here on the list (as they are often cross-posted)
receive faster turnaround.
Roughly speaking, there're two groups of questions: Implementing things on
Spark and Running Spark.  The second one is borderline on SO guidelines as
they often involve cluster setups, long logs and little idea of what's
going on (mind you, often those questions come from people starting with
Spark)

In my opinion, Stack Overflow offers a better Q/A experience, in
particular, they have tooling in place to reduce duplicates, something that
often overloads this list (same getting started issues or how to map,
filter, flatmap over and over again).  That said, this list offers a
richer forum, where the expertise pool is a lot deeper.
Also, while SO is fairly strict in requiring posters from showing a minimal
amount of effort in the question being asked, this list is quite friendly
to the same behavior. This could be probably an element that makes the list
'lower impedance'.
One additional thing on SO is that the [apache-spark] tag is a 'low rep'
tag. Neither questions nor answers get significant voting, reducing the
'rep gaming' factor  (discouraging participation?)

Thinking about how to improve both platforms: SO[apache-spark] and this ML,
and get back the list to not overwhelming message volumes, we could
implement some 'load balancing' policies:
- encourage new users to use Stack Overflow, in particular, redirect newbie
questions to SO the friendly way: did you search SO already? or link to
an existing question.
  - most how to map, flatmap, filter, aggregate, reduce, ... would fall
under  this category
- encourage domain experts to hang on SO more often  (my impression is that
MLLib, GraphX are fairly underserved)
- have an 'scalation process' in place, where we could post
'interesting/hard/bug' questions from SO back to the list (or encourage the
poster to do so)
- update our community guidelines on [
http://spark.apache.org/community.html] to implement such policies.

Those are just some ideas on how to improve the community and better serve
the newcomers while avoiding overload of our existing expertise pool.

kr, Gerard.


On Thu, Jan 22, 2015 at 10:42 AM, Sean Owen so...@cloudera.com wrote:

 Yes, there is some project business like votes of record on releases that
 needs to be carried on in standard, simple accessible place and SO is not
 at all suitable.

 Nobody is stuck with Nabble. The suggestion is to enable a different
 overlay on the existing list. SO remains a place you can ask questions too.
 So I agree with Nick's take.

 BTW are there perhaps plans to split this mailing list into
 subproject-specific lists? That might also help tune in/out the subset of
 conversations of interest.
 On Jan 22, 2015 10:30 AM, Petar Zecevic petar.zece...@gmail.com wrote:


 Ok, thanks for the clarifications. I didn't know this list has to remain
 as the only official list.

 Nabble is really not the best solution in the world, but we're stuck with
 it, I guess.

 That's it from me on this subject.

 Petar


 On 22.1.2015. 3:55, Nicholas Chammas wrote:

  I think a few things need to be laid out clearly:

1. This mailing list is the “official” user discussion platform. That
is, it is sponsored and managed by the ASF.
2. Users are free to organize independent discussion platforms
focusing on Spark, and there is already one such platform in Stack 
 Overflow
under the apache-spark and related tags. Stack Overflow works quite
well.
3. The ASF will not agree to deprecating or migrating this user list
to a platform that they do not control.
4. This mailing list has grown to an unwieldy size and discussions
are hard to find or follow; discussion tooling is also lacking. We want to
improve the utility and user experience of this mailing list.
5. We don’t want to fragment this “official” discussion community.
6. Nabble is an independent product not affiliated with the ASF. It
offers a slightly better interface to the Apache mailing list archives.

 So to respond to some of your points, pzecevic:

 Apache user group could be frozen (not accepting new questions, if that’s
 possible) and redirect users to Stack Overflow (automatic reply?).

 From what I understand of the ASF’s policies, this is not possible. :(
 This mailing list must remain the official Spark user discussion platform.

 Other thing, about new Stack Exchange site I proposed earlier. If a new
 site is created, there is no problem with guidelines, I think, because
 Spark community can apply different guidelines for the new site.

 I think Stack Overflow and the various Spark tags are working fine. I
 don’t see a compelling need for a Stack 

Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
Given that the process, and in particular, the setup of connections, is
bound to the number of partitions (in x.foreachPartition{ x= ???}), I
think it would be worth trying reducing them.
Increasing the  'spark.streaming.BlockInterval' will do the trick (you can
read the tuning details here:
http://www.virdata.com/tuning-spark/#Partitions)

-kr, Gerard.

On Thu, Jan 22, 2015 at 4:28 PM, Gerard Maas gerard.m...@gmail.com wrote:

 So the system has gone from 7msg in 4.961 secs (median) to 106msgs in
 4,761 seconds.
 I think there's evidence that setup costs are quite high in this case and
 increasing the batch interval is helping.

 On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee 
 asudipta.baner...@gmail.com wrote:

 Hi Ashic Mahtab,

 The Cassandra and the Zookeeper are they installed as a part of Yarn
 architecture or are they installed in a separate layer with Apache Spark .

 Thanks and Regards,
 Sudipta

 On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab as...@live.com wrote:

 Hi Guys,
 So I changed the interval to 15 seconds. There's obviously a lot more
 messages per batch, but (I think) it looks a lot healthier. Can you see any
 major warning signs? I think that with 2 second intervals, the setup /
 teardown per partition was what was causing the delays.

 Streaming

- *Started at: *Thu Jan 22 13:23:12 GMT 2015
- *Time since start: *1 hour 17 minutes 16 seconds
- *Network receivers: *2
- *Batch interval: *15 seconds
- *Processed batches: *309
- *Waiting batches: *0



 Statistics over last 100 processed batchesReceiver Statistics

- Receiver


- Status


- Location


- Records in last batch
- [2015/01/22 14:40:29]


- Minimum rate
- [records/sec]


- Median rate
- [records/sec]


- Maximum rate
- [records/sec]


- Last Error

 RmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 K29106295-RmqReceiver-1ACTIVE
 VDCAPP50.bar.local2.6 K29107291-
 Batch Processing Statistics

MetricLast batchMinimum25th percentileMedian75th 
 percentileMaximumProcessing
Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 ms4 seconds 761 ms4
seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 ms4 ms9
msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4
seconds 764 ms4 seconds 792 ms5 seconds 809 ms


 Regards,
 Ashic.
 --
 From: as...@live.com
 To: gerard.m...@gmail.com
 CC: user@spark.apache.org
 Subject: RE: Are these numbers abnormal for spark streaming?
 Date: Thu, 22 Jan 2015 12:32:05 +


 Hi Gerard,
 Thanks for the response.

 The messages get desrialised from msgpack format, and one of the strings
 is desrialised to json. Certain fields are checked to decide if further
 processing is required. If so, it goes through a series of in mem filters
 to check if more processing is required. If so, only then does the heavy
 work start. That consists of a few db queries, and potential updates to the
 db + message on message queue. The majority of messages don't need
 processing. The messages needing processing at peak are about three every
 other second.

 One possible things that might be happening is the session
 initialisation and prepared statement initialisation for each partition. I
 can resort to some tricks, but I think I'll try increasing batch interval
 to 15 seconds. I'll report back with findings.

 Thanks,
 Ashic.

 --
 From: gerard.m...@gmail.com
 Date: Thu, 22 Jan 2015 12:30:08 +0100
 Subject: Re: Are these numbers abnormal for spark streaming?
 To: tathagata.das1...@gmail.com
 CC: as...@live.com; t...@databricks.com; user@spark.apache.org

 and post the code (if possible).
 In a nutshell, your processing time  batch interval,  resulting in an
 ever-increasing delay that will end up in a crash.
 3 secs to process 14 messages looks like a lot. Curious what the job
 logic is.

 -kr, Gerard.

 On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 This is not normal. Its a huge scheduling delay!! Can you tell me more
 about the application?
 - cluser setup, number of receivers, whats the computation, etc.

 On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:

 Hate to do this...but...erm...bump? Would really appreciate input from
 others using Streaming. Or at least some docs that would tell me if these
 are expected or not.

 --
 From: as...@live.com
 To: user@spark.apache.org
 Subject: Are these numbers abnormal for spark streaming?
 Date: Wed, 21 Jan 2015 11:26:31 +


 Hi Guys,
 I've got Spark Streaming set up for a low data rate system (using
 spark's features for analysis, rather than high throughput). Messages are
 coming in throughout the day, at around 1-20 per second (finger in the air
 estimate...not analysed yet).  In the spark streaming UI for the
 application, I'm getting the following after 17 hours.

 Streaming

- *Started at: *Tue Jan 20 16:58:43 GMT 2015

Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Gerard Maas
So the system has gone from 7msg in 4.961 secs (median) to 106msgs in 4,761
seconds.
I think there's evidence that setup costs are quite high in this case and
increasing the batch interval is helping.

On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee 
asudipta.baner...@gmail.com wrote:

 Hi Ashic Mahtab,

 The Cassandra and the Zookeeper are they installed as a part of Yarn
 architecture or are they installed in a separate layer with Apache Spark .

 Thanks and Regards,
 Sudipta

 On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab as...@live.com wrote:

 Hi Guys,
 So I changed the interval to 15 seconds. There's obviously a lot more
 messages per batch, but (I think) it looks a lot healthier. Can you see any
 major warning signs? I think that with 2 second intervals, the setup /
 teardown per partition was what was causing the delays.

 Streaming

- *Started at: *Thu Jan 22 13:23:12 GMT 2015
- *Time since start: *1 hour 17 minutes 16 seconds
- *Network receivers: *2
- *Batch interval: *15 seconds
- *Processed batches: *309
- *Waiting batches: *0



 Statistics over last 100 processed batchesReceiver Statistics

- Receiver


- Status


- Location


- Records in last batch
- [2015/01/22 14:40:29]


- Minimum rate
- [records/sec]


- Median rate
- [records/sec]


- Maximum rate
- [records/sec]


- Last Error

 RmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 K29106295-RmqReceiver-1ACTIVE
 VDCAPP50.bar.local2.6 K29107291-
 Batch Processing Statistics

MetricLast batchMinimum25th percentileMedian75th 
 percentileMaximumProcessing
Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 ms4 seconds 761 ms4
seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 ms4 ms9 
 msTotal
Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 764 ms4
seconds 792 ms5 seconds 809 ms


 Regards,
 Ashic.
 --
 From: as...@live.com
 To: gerard.m...@gmail.com
 CC: user@spark.apache.org
 Subject: RE: Are these numbers abnormal for spark streaming?
 Date: Thu, 22 Jan 2015 12:32:05 +


 Hi Gerard,
 Thanks for the response.

 The messages get desrialised from msgpack format, and one of the strings
 is desrialised to json. Certain fields are checked to decide if further
 processing is required. If so, it goes through a series of in mem filters
 to check if more processing is required. If so, only then does the heavy
 work start. That consists of a few db queries, and potential updates to the
 db + message on message queue. The majority of messages don't need
 processing. The messages needing processing at peak are about three every
 other second.

 One possible things that might be happening is the session initialisation
 and prepared statement initialisation for each partition. I can resort to
 some tricks, but I think I'll try increasing batch interval to 15 seconds.
 I'll report back with findings.

 Thanks,
 Ashic.

 --
 From: gerard.m...@gmail.com
 Date: Thu, 22 Jan 2015 12:30:08 +0100
 Subject: Re: Are these numbers abnormal for spark streaming?
 To: tathagata.das1...@gmail.com
 CC: as...@live.com; t...@databricks.com; user@spark.apache.org

 and post the code (if possible).
 In a nutshell, your processing time  batch interval,  resulting in an
 ever-increasing delay that will end up in a crash.
 3 secs to process 14 messages looks like a lot. Curious what the job
 logic is.

 -kr, Gerard.

 On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 This is not normal. Its a huge scheduling delay!! Can you tell me more
 about the application?
 - cluser setup, number of receivers, whats the computation, etc.

 On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:

 Hate to do this...but...erm...bump? Would really appreciate input from
 others using Streaming. Or at least some docs that would tell me if these
 are expected or not.

 --
 From: as...@live.com
 To: user@spark.apache.org
 Subject: Are these numbers abnormal for spark streaming?
 Date: Wed, 21 Jan 2015 11:26:31 +


 Hi Guys,
 I've got Spark Streaming set up for a low data rate system (using spark's
 features for analysis, rather than high throughput). Messages are coming in
 throughout the day, at around 1-20 per second (finger in the air
 estimate...not analysed yet).  In the spark streaming UI for the
 application, I'm getting the following after 17 hours.

 Streaming

- *Started at: *Tue Jan 20 16:58:43 GMT 2015
- *Time since start: *18 hours 24 minutes 34 seconds
- *Network receivers: *2
- *Batch interval: *2 seconds
- *Processed batches: *16482
- *Waiting batches: *1



 Statistics over last 100 processed batchesReceiver Statistics

- Receiver


- Status


- Location


- Records in last batch
- [2015/01/21 11:23:18]


- Minimum rate
- [records/sec]


- Median rate
- [records/sec]


- 

Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
Hi Tamas,

I meant not changing the receivers, but starting/stopping the Streaming
jobs. So you would have a 'small' Streaming job for a subset of streams
that you'd configure-start-stop  on demand.
I haven't tried myself yet, but I think it should also be possible to
create a Streaming Job from the Spark Job Server (
https://github.com/spark-jobserver/spark-jobserver). Then you would have a
REST interface that even gives you the possibility of passing a
configuration.

-kr, Gerard.

On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor jambo...@gmail.com wrote:

 we were thinking along the same line, that is to fix the number of streams
 and change the input and output channels dynamically.

 But could not make it work (seems that the receiver is not allowing any
 change in the config after it started).

 thanks,

 On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 One possible workaround could be to orchestrate launch/stopping of
 Streaming jobs on demand as long as the number of jobs/streams stay
 within the boundaries of the resources (cores) you've available.
 e.g. if you're using Mesos, Marathon offers a REST interface to manage
 job lifecycle. You will still need to solve the dynamic configuration
 through some alternative channel.

 On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com
 wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code without
 much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that
 would pick up data from sql databases. we'd like to keep that stream
 context running and dynamically change the streams on demand, adding and
 removing streams based on demand. alternativel, if a stream is fixed, is it
 possible to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org







Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
One possible workaround could be to orchestrate launch/stopping of
Streaming jobs on demand as long as the number of jobs/streams stay within
the boundaries of the resources (cores) you've available.
e.g. if you're using Mesos, Marathon offers a REST interface to manage job
lifecycle. You will still need to solve the dynamic configuration through
some alternative channel.

On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code without
 much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that would
 pick up data from sql databases. we'd like to keep that stream context
 running and dynamically change the streams on demand, adding and removing
 streams based on demand. alternativel, if a stream is fixed, is it possible
 to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Gerard Maas
Hi Mukesh,

How are you creating your receivers? Could you post the (relevant) code?

-kr, Gerard.

On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hello Guys,

 I've re partitioned my kafkaStream so that it gets evenly distributed
 among the executors and the results are better.
 Still from the executors page it seems that only 1 executors all 8 cores
 are getting used and other executors are using just 1 core.

 Is this the correct interpretation based on the below data? If so how can
 we fix this?

 [image: Inline image 1]

 On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level
 i.e.
  all the tasks are running on the same executor where my
 kafkareceiver(s) are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms,
 which
  forced the task rebalancing among nodes, let me know if there is a
 better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3 workers
 
  driver were running on the same 8 core box and the results were
 similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
  wrote:
 
  When running in standalone mode, each executor will be able to use
 all 8
  cores on the box.  When running on YARN, each executor will only have
 access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
 
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does
 each machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html) vs
 a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has
 more number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone
 cluster each 5sec
  batch is getting processed in 0.4sec.
  Also, In YARN mode all the executors are not getting used up
 evenly
  as vm-13  vm-14 are running most of the tasks whereas in the
 standalone
  mode all the executors are running the tasks.
 
  Do I need to set up some configuration to evenly distribute the
  tasks? Also do you have any pointers on the reasons the yarn
 job is 10x
  slower than the standalone job?
  Any suggestion is greatly appreciated, Thanks in advance.
 
  YARN(5 workers + driver)
  

Re: Registering custom metrics

2015-01-08 Thread Gerard Maas
Very interesting approach. Thanks for sharing it!

On Thu, Jan 8, 2015 at 5:30 PM, Enno Shioji eshi...@gmail.com wrote:

 FYI I found this approach by Ooyala.

 /** Instrumentation for Spark based on accumulators.
   *
   * Usage:
   * val instrumentation = new SparkInstrumentation(example.metrics)
   * val numReqs = sc.accumulator(0L)
   * instrumentation.source.registerDailyAccumulator(numReqs, numReqs)
   * instrumentation.register()
   *
   * Will create and report the following metrics:
   * - Gauge with total number of requests (daily)
   * - Meter with rate of requests
   *
   * @param prefix prefix for all metrics that will be reported by this 
 Instrumentation
   */

 https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23
 ᐧ

 On Mon, Jan 5, 2015 at 2:56 PM, Enno Shioji eshi...@gmail.com wrote:

 Hi Gerard,

 Thanks for the answer! I had a good look at it, but I couldn't figure out
 whether one can use that to emit metrics from your application code.

 Suppose I wanted to monitor the rate of bytes I produce, like so:

 stream
 .map { input =
   val bytes = produce(input)
   // metricRegistry.meter(some.metrics).mark(bytes.length)
   bytes
 }
 .saveAsTextFile(text)

 Is there a way to achieve this with the MetricSystem?


 ᐧ

 On Mon, Jan 5, 2015 at 10:24 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 Yes, I managed to create a register custom metrics by creating an
  implementation  of org.apache.spark.metrics.source.Source and
 registering it to the metrics subsystem.
 Source is [Spark] private, so you need to create it under a org.apache.spark
 package. In my case, I'm dealing with Spark Streaming metrics, and I
 created my CustomStreamingSource under org.apache.spark.streaming as I
 also needed access to some [Streaming] private components.

 Then, you register your new metric Source on the Spark's metric system,
 like so:

 SparkEnv.get.metricsSystem.registerSource(customStreamingSource)

 And it will get reported to the metrics Sync active on your system. By
 default, you can access them through the metric endpoint:
 http://driver-host:ui-port/metrics/json

 I hope this helps.

 -kr, Gerard.






 On Tue, Dec 30, 2014 at 3:32 PM, eshioji eshi...@gmail.com wrote:

 Hi,

 Did you find a way to do this / working on this?
 Am trying to find a way to do this as well, but haven't been able to
 find a
 way.



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Registering-custom-metrics-tp9030p9968.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org







Re: Join RDDs with DStreams

2015-01-08 Thread Gerard Maas
You are looking for dstream.transform(rdd = rdd.op(otherRdd))

The docs contain an example on how to use transform.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

-kr, Gerard.

On Thu, Jan 8, 2015 at 5:50 PM, Asim Jalis asimja...@gmail.com wrote:

 Is there a way to join non-DStream RDDs with DStream RDDs?

 Here is the use case. I have a lookup table stored in HDFS that I want to
 read as an RDD. Then I want to join it with the RDDs that are coming in
 through the DStream. How can I do this?

 Thanks.

 Asim



Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 —
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put(group.id, kafkaConsumerGroup);
  kafkaConf.put(consumer.timeout.ms, 3);
  kafkaConf.put(auto.offset.reset, largest);
  kafkaConf.put(fetch.message.max.bytes, 2000);
  kafkaConf.put(zookeeper.session.timeout.ms, 6000);
  kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
  kafkaConf.put(zookeeper.sync.time.ms, 2000);
  kafkaConf.put(rebalance.backoff.ms, 1);
  kafkaConf.put(rebalance.max.retries, 20);

 --
 Thanks  Regards,

 Mukesh Jha me.mukesh@gmail.com





Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
AFAIK, there're two levels of parallelism related to the Spark Kafka
consumer:

At JVM level: For each receiver, one can specify the number of threads for
a given topic, provided as a map [topic - nthreads].  This will
effectively start n JVM threads consuming partitions of that kafka topic.
At Cluster level: One can create several DStreams, and each will have one
receiver and use 1 executor core in Spark each DStream will have its
receiver as defined in the previous line.

What you need to ensure is that there's a consumer attached to each
partition of your kafka topic. That is, nthreads * nReceivers =
#kafka_partitions(topic)

e.g:
Given
nPartitions = #partitions of your topic
nThreads = #of threads per receiver

val kafkaStreams = (1 to nPartitions/nThreads).map{ i =
KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - nThreads),
StorageLevel.MEMORY_ONLY_SER)

For this to work, you need at least (nPartitions/nThreads +1) cores in your
Spark cluster, although I would recommend to have 2-3x
(nPartitions/nThreads).
(and don't forget to union the streams after creation)

-kr, Gerard.



On Wed, Jan 7, 2015 at 4:43 PM, francois.garil...@typesafe.com wrote:

 - You are launching up to 10 threads/topic per Receiver. Are you sure your
 receivers can support 10 threads each ? (i.e. in the default configuration,
 do they have 10 cores). If they have 2 cores, that would explain why this
 works with 20 partitions or less.

 - If you have 90 partitions, why start 10 Streams, each consuming 10
 partitions, and then removing the stream at index 0 ? Why not simply start
 10 streams with 9 partitions ? Or, more simply,

 val kafkaStreams = (1 to numPartitions).map { _ =
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - 1),
 StorageLevel.MEMORY_ONLY_SER)

 - You’re consuming up to 10 local threads *per topic*, on each of your 10
 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located
 on a single machine. You mentioned having a single Kafka topic with 90
 partitions. Why not have a single-element topicMap ?

 —
 FG


 On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

  I understand that I've to create 10 parallel streams. My code is
 running fine when the no of partitions is ~20, but when I increase the no
 of partitions I keep getting in this issue.

 Below is my code to create kafka streams, along with the configs used.

 MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
 kafkaConf.put(group.id, kafkaConsumerGroup);
 kafkaConf.put(consumer.timeout.ms, 3);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(fetch.message.max.bytes, 2000);
 kafkaConf.put(zookeeper.session.timeout.ms, 6000);
 kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
 kafkaConf.put(zookeeper.sync.time.ms, 2000);
 kafkaConf.put(rebalance.backoff.ms, 1);
 kafkaConf.put(rebalance.max.retries, 20);
 String[] topics = kafkaTopicsList;
 int numStreams = numKafkaThreads; // this is *10*
 MapString, Integer topicMap = new HashMap();
 for (String topic: topics) {
   topicMap.put(topic, numStreams);
 }

 ListJavaPairDStreambyte[], byte[] kafkaStreams = new
 ArrayList(numStreams);
 for (int i = 0; i  numStreams; i++) {
   kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
 byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
 topicMap, StorageLevel.MEMORY_ONLY_SER()));
 }
 JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
 kafkaStreams);


 On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 Could you add the code where you create the Kafka consumer?

 -kr, Gerard.

 On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 —
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put

Re: RDD for Storm Streaming in Spark

2014-12-23 Thread Gerard Maas
Hi,

I'm not sure what you are asking:

Whether we can use spouts and bolts in Spark  (= no)

or whether we can do streaming in Spark:
http://spark.apache.org/docs/latest/streaming-programming-guide.html

-kr, Gerard.


On Tue, Dec 23, 2014 at 9:03 AM, Ajay ajay.ga...@gmail.com wrote:

 Hi,

 Can we use Storm Streaming as RDD in Spark? Or any way to get Spark work
 with Storm?

 Thanks
 Ajay



Re: RDD for Storm Streaming in Spark

2014-12-23 Thread Gerard Maas
I'm not aware of a project trying to achieve this integration. At some
point Summingbird had the intention of adding an Spark port, and that could
potentially bridge Storm and Spark. Not sure if that evolved into something
concrete.

In any case, an attempt to bring Storm and Spark together will consist  in
Storm collecting the data to construct an RDD and provide that to Spark for
processing, basically what Spark Streaming is doing.

-kr, Gerard.



On Tue, Dec 23, 2014 at 9:49 AM, Ajay ajay.ga...@gmail.com wrote:

 Hi,

 The question is to do streaming in Spark with Storm (not using Spark
 Streaming).

 The idea is to use Spark as a in-memory computation engine and static data
 coming from Cassandra/Hbase and streaming data from Storm.

 Thanks
 Ajay


 On Tue, Dec 23, 2014 at 2:03 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 I'm not sure what you are asking:

 Whether we can use spouts and bolts in Spark  (= no)

 or whether we can do streaming in Spark:
 http://spark.apache.org/docs/latest/streaming-programming-guide.html

 -kr, Gerard.


 On Tue, Dec 23, 2014 at 9:03 AM, Ajay ajay.ga...@gmail.com wrote:

 Hi,

 Can we use Storm Streaming as RDD in Spark? Or any way to get Spark work
 with Storm?

 Thanks
 Ajay






Tuning Spark Streaming jobs

2014-12-22 Thread Gerard Maas
Hi,

After facing issues with the performance of some of our Spark Streaming
 jobs, we invested quite some effort figuring out the factors that affect
the performance characteristics of a Streaming job. We  defined an
empirical model that helps us reason about Streaming jobs and applied it to
tune the jobs in order to maximize throughput.

We have summarized our findings in a blog post with the intention of
collecting feedback and hoping that it is useful to other Spark Streaming
users facing similar issues.

 http://www.virdata.com/tuning-spark/

Your feedback is welcome.

With kind regards,

Gerard.
Data Processing Team Lead
Virdata.com
@maasg


Re: Tuning Spark Streaming jobs

2014-12-22 Thread Gerard Maas
Hi Tim,

That would be awesome. We have seen some really disparate Mesos allocations
for our Spark Streaming jobs. (like (7,4,1) over 3 executors for 4 kafka
consumer instead of the ideal (3,3,3,3))
For network dependent consumers, achieving an even deployment would
 provide a reliable and reproducible streaming job execution from the
performance point of view.
We're deploying in coarse grain mode. Not sure Spark Streaming would work
well in fine-grained given the added latency to acquire a worker.

You mention that you're changing the Mesos scheduler. Is there a Jira where
this job is taking place?

-kr, Gerard.


On Mon, Dec 22, 2014 at 6:01 PM, Timothy Chen tnac...@gmail.com wrote:

 Hi Gerard,

 Really nice guide!

 I'm particularly interested in the Mesos scheduling side to more evenly
 distribute cores across cluster.

 I wonder if you are using coarse grain mode or fine grain mode?

 I'm making changes to the spark mesos scheduler and I think we can propose
 a best way to achieve what you mentioned.

 Tim

 Sent from my iPhone

  On Dec 22, 2014, at 8:33 AM, Gerard Maas gerard.m...@gmail.com wrote:
 
  Hi,
 
  After facing issues with the performance of some of our Spark Streaming
  jobs, we invested quite some effort figuring out the factors that affect
  the performance characteristics of a Streaming job. We  defined an
  empirical model that helps us reason about Streaming jobs and applied it
 to
  tune the jobs in order to maximize throughput.
 
  We have summarized our findings in a blog post with the intention of
  collecting feedback and hoping that it is useful to other Spark Streaming
  users facing similar issues.
 
  http://www.virdata.com/tuning-spark/
 
  Your feedback is welcome.
 
  With kind regards,
 
  Gerard.
  Data Processing Team Lead
  Virdata.com
  @maasg



Re: Does Spark 1.2.0 support Scala 2.11?

2014-12-19 Thread Gerard Maas
Check out the 'compiling for Scala 2.11'  instructions:

http://spark.apache.org/docs/1.2.0/building-spark.html#building-for-scala-211

-kr, Gerard.

On Fri, Dec 19, 2014 at 12:00 PM, Jonathan Chayat jonatha...@supersonic.com
 wrote:

 The following ticket:

 https://issues.apache.org/jira/browse/SPARK-1812

 for supporting 2.11 have been marked as fixed in 1.2,
 but the docs in the Spark site still say that 2.10 is required.

 Thanks,
 Jon



Re: Scala Lazy values and partitions

2014-12-19 Thread Gerard Maas
It will be instantiated once per VM, which translates to once per executor.

-kr, Gerard.

On Fri, Dec 19, 2014 at 12:21 PM, Ashic Mahtab as...@live.com wrote:

 Hi Guys,
 Are scala lazy values instantiated once per executor, or once per
 partition? For example, if I have:

 object Something =
 val lazy context = create()

 def foo(item) = context.doSomething(item)

 and I do

 someRdd.foreach(Something.foo)

 then will context get instantiated once per executor, or once per
 partition?

 Thanks,
 Ashic.




Re: spark streaming kafa best practices ?

2014-12-17 Thread Gerard Maas
Patrick,

I was wondering why one would choose for rdd.map vs rdd.foreach to execute
a side-effecting function on an RDD.

-kr, Gerard.

On Sat, Dec 6, 2014 at 12:57 AM, Patrick Wendell pwend...@gmail.com wrote:

 The second choice is better. Once you call collect() you are pulling
 all of the data onto a single node, you want to do most of the
 processing  in parallel on the cluster, which is what map() will do.
 Ideally you'd try to summarize the data or reduce it before calling
 collect().

 On Fri, Dec 5, 2014 at 5:26 AM, david david...@free.fr wrote:
  hi,
 
What is the bet way to process a batch window in SparkStreaming :
 
  kafkaStream.foreachRDD(rdd = {
rdd.collect().foreach(event = {
  // process the event
  process(event)
})
  })
 
 
  Or
 
  kafkaStream.foreachRDD(rdd = {
rdd.map(event = {
  // process the event
  process(event)
}).collect()
  })
 
 
  thank's
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Get the value of DStream[(String, Iterable[String])]

2014-12-17 Thread Gerard Maas
You can create a DStream that contains the count, transforming the grouped
windowed RDD, like this:
val errorCount = grouping.map{case (k,v) = v.size }

If you need to preserve the key:
val errorCount = grouping.map{case (k,v) = (k,v.size) }

or you if you don't care about the content of the values, you could count
directly, instead of grouping first:

val errorCount = mapErrorLines.countByWindow(Seconds(8), Seconds(4))

Not sure why you're using map(line = (key, line)) as there only seem to
be one key. If that's not required, we can simplify one more step:

val errorCount = errorLines.countByWindow(Seconds(8), Seconds(4))


The question is: what do you want to do with that count afterwards?

-kr, Gerard.


On Wed, Dec 17, 2014 at 5:11 PM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 I'm a newbie with Spark,,, a simple question

 val errorLines = lines.filter(_.contains(h))
 val mapErrorLines = errorLines.map(line = (key, line))
 val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4))

 I get something like:

 604: ---
 605: Time: 141883218 ms
 606: ---
 607: (key,ArrayBuffer(h2, h3, h4))

 Now, I would like to get that ArrayBuffer and count the number of
 elements,,
 How could I get that arrayBuffer??? something like:
 val values = grouping.getValue()... How could I do this in Spark with
 Scala?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Data Loss - Spark streaming

2014-12-16 Thread Gerard Maas
Hi Jeniba,

The second part of this meetup recording has a very good answer to your
question.  TD explains the current behavior and the on-going work in Spark
Streaming to fix HA.
https://www.youtube.com/watch?v=jcJq3ZalXD8


-kr, Gerard.

On Tue, Dec 16, 2014 at 11:32 AM, Jeniba Johnson 
jeniba.john...@lntinfotech.com wrote:

 Hi,

 I need a clarification, while running streaming examples, suppose the
 batch interval is set to 5 minutes, after collecting the data from the
 input source(FLUME) and  processing till 5 minutes.
 What will happen to the data which is flowing continuously from the input
 source to spark streaming ? Will that data be stored somewhere or else the
 data will be lost ?
 Or else what is the solution to capture each and every data without any
 loss in Spark streaming.

 Awaiting for your kind reply.


 Regards,
 Jeniba Johnson


 
 The contents of this e-mail and any attachment(s) may contain confidential
 or privileged information for the intended recipient(s). Unintended
 recipients are prohibited from taking action on the basis of information in
 this e-mail and using or disseminating the information, and must notify the
 sender and delete it from their system. LT Infotech will not accept
 responsibility or liability for the accuracy or completeness of, or the
 presence of any virus or disabling code in this e-mail



  1   2   >