[REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread Adrian Tanase
Hi all, Trying to repost this question from a colleague on my team, somehow his subscription is not active: http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html Appreciate any thoughts, -adrian

Re: thought experiment: use spark ML to real time prediction

2015-11-11 Thread Adrian Tanase
I don’t think this answers your question but here’s how you would evaluate the model in realtime in a streaming app https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html Maybe you can find a way to extract portions of MLLib and run them

Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-10 Thread Adrian Tanase
Can you be a bit more specific about what “blow up” means? Also what do you mean by “messed up” brokers? Inbalance? Broker(s) dead? We’re also using the direct consumer and so far nothing dramatic happened: - on READ it automatically reads from backups if leader is dead (machine gone) - or READ

Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-10 Thread Adrian Tanase
I’ve seen this before during an extreme outage on the cluster, where the kafka offsets checkpointed by the directstreamRdd were bigger than what kafka reported. The checkpoint was therefore corrupted. I don’t know the root cause but since I was stressing the cluster during a reliability test I

Re: Dynamic Allocation & Spark Streaming

2015-11-06 Thread Adrian Tanase
You can register a streaming listener – in the BatchInfo you’ll find a lot of stats (including count of received records) that you can base your logic on: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala

Re: How to unpersist a DStream in Spark Streaming

2015-11-06 Thread Adrian Tanase
Do we have any guarantees on the maximum duration? I've seen RDDs kept around for 7-10 minutes on batches of 20 secs and checkpoint of 100 secs. No windows, just updateStateByKey. t's not a memory issue but on checkpoint recovery it goes back to Kafka for 10 minutes of data, any idea why?

Re: Scheduling Spark process

2015-11-05 Thread Adrian Tanase
You should also specify how you’re planning to query or “publish” the data. I would consider a combination of: - spark streaming job that ingests the raw events in real time, validates, pre-process and saves to stable storage - stable storage could be HDFS/parquet or a database optimized for

Re: How to use data from Database and reload every hour

2015-11-05 Thread Adrian Tanase
You should look at .transform – it’s a powerful transformation (sic) that allows you to dynamically load resources and it gets executed in every micro batch. Re-broadcasting something should be possible from inside transform as that code is executed on the driver but it’s still a controversial

Re: Why some executors are lazy?

2015-11-04 Thread Adrian Tanase
* The scheduler might decide to take advantage of free cores in the cluster and schedule an off-node processing and you can control how long it waits through the spark.locality.wait settings Hope this helps, -adrian From: Khaled Ammar Date: Wednesday, November 4, 2015 at 4:03 PM To: Adrian Tanase

Re: Spark Streaming data checkpoint performance

2015-11-04 Thread Adrian Tanase
Nice! Thanks for sharing, I wasn’t aware of the new API. Left some comments on the JIRA and design doc. -adrian From: Shixiong Zhu Date: Tuesday, November 3, 2015 at 3:32 AM To: Thúy Hằng Lê Cc: Adrian Tanase, "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re:

Re: Why some executors are lazy?

2015-11-04 Thread Adrian Tanase
If some of the operations required involve shuffling and partitioning, it might mean that the data set is skewed to specific partitions which will create hot spotting on certain executors. -adrian From: Khaled Ammar Date: Tuesday, November 3, 2015 at 11:43 PM To:

FW: Spark streaming - failed recovery from checkpoint

2015-11-02 Thread Adrian Tanase
Re-posting here, didn’t get any feedback on the dev list. Has anyone experienced corrupted checkpoints recently? Thanks! -adrian From: Adrian Tanase Date: Thursday, October 29, 2015 at 1:38 PM To: "d...@spark.apache.org<mailto:d...@spark.apache.org>" Subject: Spark streaming -

Re: execute native system commands in Spark

2015-11-02 Thread Adrian Tanase
Have you seen .pipe()? On 11/2/15, 5:36 PM, "patcharee" wrote: >Hi, > >Is it possible to execute native system commands (in parallel) Spark, >like scala.sys.process ? > >Best, >Patcharee > >- >To

Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Adrian Tanase
You are correct, the default checkpointing interval is 10 seconds or your batch size, whichever is bigger. You can change it by calling .checkpoint(x) on your resulting Dstream. For the rest, you are probably keeping an “all time” word count that grows unbounded if you never remove words from

Re: Pivot Data in Spark and Scala

2015-10-30 Thread Adrian Tanase
Its actually a bit tougher as you’ll first need all the years. Also not sure how you would reprsent your “columns” given they are dynamic based on the input data. Depending on your downstream processing, I’d probably try to emulate it with a hash map with years as keys instead of the columns.

Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Adrian Tanase
Does it need to be a mock? Can you use sc.parallelize(data)? From: Priya Ch Date: Thursday, October 29, 2015 at 2:00 PM To: Василец Дмитрий Cc: "user@spark.apache.org", "spark-connector-u...@lists.datastax.com"

Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Adrian Tanase
o S3. And with smaller batch time intervals there >were many small files being written to S3, something to avoid. That was the >explanation of the developer who made this decision (who's no longer on the >team). We're in the process of re-evaluating. >-- > Nick > >-

Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Adrian Tanase
You can call .repartition on the Dstream created by the Kafka direct consumer. You take the one-time hit of a shuffle but gain the ability to scale out processing beyond your number of partitions. We’re doing this to scale up from 36 partitions / topic to 140 partitions (20 cores * 7 nodes)

Re: How do I parallize Spark Jobs at Executor Level.

2015-10-28 Thread Adrian Tanase
The first line is distributing your fileList variable in the cluster as a RDD, partitioned using the default partitioner settings (e.g. Number of cores in your cluster). Each of your workers would one or more slices of data (depending on how many cores each executor has) and the abstraction is

Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Adrian Tanase
Does it work as expected with smaller batch or smaller load? Could it be that it's accumulating too many events over 3 minutes? You could also try increasing the parallelism via repartition to ensure smaller tasks that can safely fit in working memory. Sent from my iPhone > On 28 Oct 2015, at

Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
Also I just remembered about cloudera’s contribution http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/ From: Deng Ching-Mallete Date: Tuesday, October 27, 2015 at 12:03 PM To: avivb Cc: user Subject: Re: There is any way to write from spark to HBase

Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
This is probably too low level but you could consider the async client inside foreachRdd: https://github.com/OpenTSDB/asynchbase http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On 10/27/15, 11:36 AM, "avivb" wrote:

Re: There is any way to write from spark to HBase CDH4?

2015-10-27 Thread Adrian Tanase
You can get a feel for it by playing with the original library published as separate project on github https://github.com/cloudera-labs/SparkOnHBase From: Deng Ching-Mallete Date: Tuesday, October 27, 2015 at 12:39 PM To: Fengdong Yu Cc: Adrian Tanase, avivb, user Subject: Re: There is any way

Re: SPARKONHBase checkpointing issue

2015-10-27 Thread Adrian Tanase
Does this help? https://issues.apache.org/jira/browse/SPARK-5206 On 10/27/15, 1:53 PM, "Amit Singh Hora" wrote: >Hi all , > >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find >below code >object test { > >def main(args: Array[String]): Unit =

Re: Separate all values from Iterable

2015-10-27 Thread Adrian Tanase
The operator you’re looking for is .flatMap. It flattens all the results if you have nested lists of results (e.g. A map over a source element can return zero or more target elements) I’m not very familiar with the Java APIs but in scala it would go like this (keeping type annotations only as

Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-26 Thread Adrian Tanase
Have you considered union-ing the 2 streams? Basically you can consider them as 2 “message types” that your update function can consume (e.g. implement a common interface): * regularUpdate * resetStateUpdate Inside your updateStateByKey you can check if any of the messages in the list

Re: Secondary Sorting in Spark

2015-10-26 Thread Adrian Tanase
shuffling… -adrian From: swetha kasireddy Date: Monday, October 26, 2015 at 6:56 AM To: Adrian Tanase Cc: Bill Bejeck, "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re: Secondary Sorting in Spark Hi, Does the use of custom partitioner in Streaming affect performance

Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
If I understand the order correctly, not really. First of all, the easiest way to make sure it works as expected is to check out the visual DAG in the spark UI. It should map 1:1 to your code, and since I don’t see any shuffles in the operations below it should execute all in one stage,

Re: Accumulators internals and reliability

2015-10-26 Thread Adrian Tanase
I can reply from an user’s perspective – I defer to semantic guarantees to someone with more experience. I’ve successfully implemented the following using a custom Accumulable class: * Created a MapAccumulator with dynamic keys (they are driven by the data coming in), as opposed to

Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
Thinking more about it – it should only be 2 tasks as A and B are most likely collapsed by spark in a single task. Again – learn to use the spark UI as it’s really informative. The combination of DAG visualization and task count should answer most of your questions. -adrian From: Adrian

Re: Spark StreamingStatefull information

2015-10-22 Thread Adrian Tanase
The result of updatestatebykey is a dstream that emits the entire state every batch - as an RDD - nothing special about it. It easy to join / cogroup with another RDD if you have the correct keys in both. You could load this one when the job starts and/or have it update with updatestatebykey as

Re: Analyzing consecutive elements

2015-10-22 Thread Adrian Tanase
. -adrian From: Sampo Niskanen Date: Thursday, October 22, 2015 at 2:12 PM To: Adrian Tanase Cc: user Subject: Re: Analyzing consecutive elements Hi, Sorry, I'm not very familiar with those methods and cannot find the 'drop' method anywhere. As an example: val arr = Array((1, "A"

Re: Job splling to disk and memory in Spark Streaming

2015-10-21 Thread Adrian Tanase
+1 – you can definitely make it work by making sure you are using the same partitioner (including the same number of partitions). For most operations like reduceByKey, updateStateByKey – simply specifying it enough. There are some gotchas for other operations: * mapValues and

Re: Spark on Yarn

2015-10-21 Thread Adrian Tanase
The question is the spark dependency is marked as provided or is included in the fat jar. For example, we are compiling the spark distro separately for java 8 + scala 2.11 + hadoop 2.6 (with maven) and marking it as provided in sbt. -adrian From: Raghuveer Chanda Date: Wednesday, October 21,

Re: Whether Spark is appropriate for our use case.

2015-10-21 Thread Adrian Tanase
Can you share your approximate data size? all should be valid use cases for spark, wondering if you are providing enough resources. Also - do you have some expectations in terms of performance? what does "slow down" mean? For this usecase I would personally favor parquet over DB, and

Re: difference between rdd.collect().toMap to rdd.collectAsMap() ?

2015-10-20 Thread Adrian Tanase
If you look at the source code you’ll see that this is merely a convenience function on PairRDDs - only interesting detail is that it uses a mutable HashMap to optimize creating maps with many keys. That being said, .collect() is called anyway.

Re: Partition for each executor

2015-10-20 Thread Adrian Tanase
I think it should use the default parallelism which by default is equal to the number of cores in your cluster. If you want to control it, specify a value for numSlices - the second param to parallelize(). -adrian On 10/20/15, 6:13 PM, "t3l" wrote: >If I have a

Re: Should I convert json into parquet?

2015-10-19 Thread Adrian Tanase
For general data access of the pre-computed aggregates (group by) you’re better off with Parquet. I’d only choose JSON if I needed interop with another app stack / language that has difficulty accessing parquet (E.g. Bulk load into document db…). On a strategic level, both JSON and parquet are

Re: Spark Streaming - use the data in different jobs

2015-10-19 Thread Adrian Tanase
+1 for re-publishing to pubsub if there is only transient value in the data. If you need to query the intermediate representation then you will need to use a database. Sharing RDDs in memory should be possible with projects like spark job server but I think that’s overkill in this scenario.

Re: Spark handling parallel requests

2015-10-19 Thread Adrian Tanase
To answer your specific question, you can’t push data to Kafka through a socket – you need a smart client library as the cluster setup is pretty advanced (also requires zookeeper). I bet there are php libraries for Kafka although after a quick search it seems they’re still pretty young. Also –

Re: How does shuffle work in spark ?

2015-10-19 Thread Adrian Tanase
I don’t know why it expands to 50 GB but it’s correct to see it both on the first operation (shuffled write) and on the next one (shuffled read). It’s the barrier between the 2 stages. -adrian From: shahid ashraf Date: Monday, October 19, 2015 at 9:53 PM To: Kartik Mathur, Adrian Tanase Cc

Re: Spark Streaming scheduler delay VS driver.cores

2015-10-19 Thread Adrian Tanase
Bump on this question – does anyone know what is the effect of spark.driver.cores on the driver's ability to manage larger clusters? Any tips on setting a correct value? I’m running Spark streaming on Yarn / Hadoop 2.6 / Spark 1.5.1. Thanks, -adrian From: Adrian Tanase Date: Saturday, October

Re: Streaming of COAP Resources

2015-10-19 Thread Adrian Tanase
I’m not familiar with you COAP library but onStart is called only once. You’re only reading the value once when the custom receiver is initialized. You need to set-up a callback, poll a buffer — again, depends on your COAP client — In short configure your client to “start listening for changes”

Re: Differentiate Spark streaming in event logs

2015-10-19 Thread Adrian Tanase
You could try to start the 2/N jobs with a slightly different log4j template, by prepending some job type to all the messages... On 10/19/15, 9:47 PM, "franklyn" wrote: >Hi I'm running a job to collect some analytics on spark jobs by analyzing >their event logs. We

Re: How to calculate row by now and output retults in Spark

2015-10-19 Thread Adrian Tanase
Are you by any chance looking for reduceByKey? IF you’re trying to collapse all the values in V into an aggregate, that’s what you should be looking at. -adrian From: Ted Yu Date: Monday, October 19, 2015 at 9:16 PM To: Shepherd Cc: user Subject: Re: How to calculate row by now and output

Spark Streaming scheduler delay VS driver.cores

2015-10-17 Thread Adrian Tanase
Hi, I’ve recently bumped up the resources for a spark streaming job – and the performance started to degrade over time. it was running fine on 7 nodes with 14 executor cores each (via Yarn) until I bumped executor.cores to 22 cores/node (out of 32 on AWS c3.xlarge, 24 for yarn) The driver has

Re: repartition vs partitionby

2015-10-17 Thread Adrian Tanase
If the dataset allows it you can try to write a custom partitioner to help spark distribute the data more uniformly. Sent from my iPhone On 17 Oct 2015, at 16:14, shahid ashraf > wrote: yes i know about that,its in case to reduce partitions. the

Re: Building with SBT and Scala 2.11

2015-10-14 Thread Adrian Tanase
kob's report was with sbt. Cheers On Tue, Oct 13, 2015 at 10:05 PM, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works. -adrian Se

Re: Building with SBT and Scala 2.11

2015-10-13 Thread Adrian Tanase
Do you mean hadoop-2.4 or 2.6? not sure if this is the issue but I'm also compiling the 1.5.1 version with scala 2.11 and hadoop 2.6 and it works. -adrian Sent from my iPhone On 14 Oct 2015, at 03:53, Jakob Odersky > wrote: I'm having trouble

Re: SQLContext within foreachRDD

2015-10-12 Thread Adrian Tanase
Not really, unless you’re doing something wrong (e.g. Call collect or similar). In the foreach loop you’re typically registering a temp table, by converting an RDD to data frame. All the subsequent queries are executed in parallel on the workers. I haven’t built production apps with this

Re: "dynamically" sort a large collection?

2015-10-12 Thread Adrian Tanase
I think you’re looking for the flatMap (or flatMapValues) operator – you can do something like sortedRdd.flatMapValues( v => If (v % 2 == 0) { Some(v / 2) } else { None } ) Then you need to sort again. -adrian From: Yifan LI Date: Monday, October 12, 2015 at 1:03 PM To: spark users Subject:

Re: Spark retrying task indefinietly

2015-10-12 Thread Adrian Tanase
To answer your question specifically - you can bump the value on spark.streaming.kafka.maxRetries (see configuration guide: http://spark.apache.org/docs/latest/configuration.html). That being said, you should avoid this by adding some validation in your deserializaiton / parse code. A quick

Re: Streaming Performance w/ UpdateStateByKey

2015-10-10 Thread Adrian Tanase
How are you determining how much time is serialization taking? I made this change in a streaming app that relies heavily on updateStateByKey. The memory consumption went up 3x on the executors but I can't see any perf improvement. Task execution time is the same and the serialization state

Re: Notification on Spark Streaming job failure

2015-10-07 Thread Adrian Tanase
We’re deploying using YARN in cluster mode, to take advantage of automatic restart of long running streaming app. We’ve also done a POC on top of Mesos+Marathon, that’s always an option. For monitoring / alerting, we’re using a combination of: * Spark REST API queried from OpsView via

Re: Secondary Sorting in Spark

2015-10-05 Thread Adrian Tanase
Great article, especially the use of a custom partitioner. Also, sorting by multiple fields by creating a tuple out of them is an awesome, easy to miss, Scala feature. Sent from my iPhone On 04 Oct 2015, at 21:41, Bill Bejeck > wrote: I've written

Re: Broadcast var is null

2015-10-05 Thread Adrian Tanase
FYI the same happens with accumulators when recovering from checkpoint. I'd love to see this fixed somehow as the workaround (using a singleton factory in foreachRdd to make sure the accumulators are initialized instead of null) is really intrusive... Sent from my iPhone On 05 Oct 2015, at

Re: Usage of transform for code reuse between Streaming and Batch job affects the performance ?

2015-10-05 Thread Adrian Tanase
It shouldn't, as lots of the streaming operations delegate to transform under the hood. Easiest way to make sure is to look at the source code - with a decent IDE navigating around should be a breeze. As a matter of fact, for more advanced operations where you may want to control the

Re: RDD of ImmutableList

2015-10-05 Thread Adrian Tanase
If you don't need to write data back using that library I'd say go for #2. Convert to a scala class and standard lists, should be easier down the line. That being said, you may end up writing custom code if you stick with kryo anyway... Sent from my iPhone On 05 Oct 2015, at 21:42, Jakub

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Adrian Tanase
ip.wea...@gmail.com>> wrote: You can't really say 8 cores is not much horsepower when you have no idea what my use case is. That's silly. On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: Forgot to mention that you could also restr

Re: how to broadcast huge lookup table?

2015-10-04 Thread Adrian Tanase
have a look at .transformWith, you can specify another RDD. Sent from my iPhone On 02 Oct 2015, at 21:50, "saif.a.ell...@wellsfargo.com" > wrote: I tried broadcasting a key-value rdd, but

Re: Shuffle Write v/s Shuffle Read

2015-10-02 Thread Adrian Tanase
I’m not sure this is related to memory management – the shuffle is the central act of moving data around nodes when the computations need the data on another node (E.g. Group by, sort, etc) Shuffle read and shuffle write should be mirrored on the left/right side of a shuffle between 2 stages.

Re: Accumulator of rows?

2015-10-02 Thread Adrian Tanase
Have you seen window functions? https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html From: "saif.a.ell...@wellsfargo.com" Date: Thursday, October 1, 2015 at 9:44 PM To: "user@spark.apache.org"

Re: Kafka Direct Stream

2015-10-01 Thread Adrian Tanase
On top of that you could make the topic part of the key (e.g. keyBy in .transform or manually emitting a tuple) and use one of the .xxxByKey operators for the processing. If you have a stable, domain specific list of topics (e.g. 3-5 named topics) and the processing is really different, I

Re: Lost leader exception in Kafka Direct for Streaming

2015-10-01 Thread Adrian Tanase
This also happened to me in extreme recovery scenarios – e.g. Killing 4 out of a 7 machine cluster. I’d put my money on recovering from an out of sync replica, although I haven’t done extensive testing around it. -adrian From: Cody Koeninger Date: Thursday, October 1, 2015 at 5:18 PM To:

Re: automatic start of streaming job on failure on YARN

2015-10-01 Thread Adrian Tanase
This happens automatically as long as you submit with cluster mode instead of client mode. (e.g. ./spark-submit —master yarn-cluster …) The property you mention would help right after that, although you will need to set it to a large value (e.g. 1000?) - as there is no “infinite” support.

Re: Monitoring tools for spark streaming

2015-09-29 Thread Adrian Tanase
You can also use the REST api introduced in 1.4 – although it’s harder to parse: * jobs from the same batch are not grouped together * You only get total delay, not scheduling delay From: Hari Shreedharan Date: Tuesday, September 29, 2015 at 5:27 AM To: Shixiong Zhu Cc: Siva,

Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
:37 PM To: Adrian Tanase Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException Adrian, Thanks for your response. I just looked at both machines we're testing on and on both the

Re: Merging two avro RDD/DataFrames

2015-09-29 Thread Adrian Tanase
Seems to me that the obvious candidate is loading both master and delta, using join or cogroup then write the new master. Through some clever sharding and key management you might achieve some efficiency gains, but I’d say start here if your numbers are in the hundreds of thousands… should run

Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
I believe some of the brokers in your cluster died and there are a number of partitions that nobody is currently managing. -adrian From: Dmitry Goldenberg Date: Tuesday, September 29, 2015 at 3:26 PM To: "user@spark.apache.org" Subject: Kafka error "partitions

Re: Converting a DStream to schemaRDD

2015-09-29 Thread Adrian Tanase
Also check this out https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/scala/src/main/scala/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingSQL.scala From the data bricks reference app: https://github.com/databricks/reference-apps From: Ewan Leith Date:

Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Adrian Tanase
AVA Code Ashish On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: You also need to provide it as parameter to spark submit http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver From: Ashish Soni D

Re: Adding / Removing worker nodes for Spark Streaming

2015-09-29 Thread Adrian Tanase
Just wanted to make sure one thing is really clear – the kafka offsets are part of the actual RDD – in every batch spark is saving the offset ranges for each partition – this in theory will make the data in each batch stable across recovery. The other important thing is that with correct

Re: Does YARN start new executor in place of the failed one?

2015-09-29 Thread Adrian Tanase
In theory, yes - however in practice it seems that it depends on how they die. I’ve recently logged an issue for the case when the machine is restarted. If the executor process dies it generally comes back gracefully. https://issues.apache.org/jira/browse/SPARK-10792 Maybe you can vote up the

Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Adrian Tanase
You also need to provide it as parameter to spark submit http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver From: Ashish Soni Date: Monday, September 28, 2015 at 5:18 PM To: user Subject: Spark Streaming Log4j Inside Eclipse I need to turn off the

Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
Good catch, I was not aware of this setting. I’m wondering though if it also generates a shuffle or if the data is still processed by the node on which it’s ingested - so that you’re not gated by the number of cores on one machine. -adrian On 9/25/15, 5:27 PM, "Silvio Fiorito"

Re: Reasonable performance numbers?

2015-09-25 Thread Adrian Tanase
It’s really hard to answer this, as the comparison is not really fair – Storm is much lower level than Spark and has less overhead when dealing with stateless operations. I’d be curious how is your colleague implementing the Average on a “batch” and what is the storm equivalent of a Batch.

Re: kafka direct streaming with checkpointing

2015-09-25 Thread Adrian Tanase
Hi Radu, The problem itself is not checkpointing the data – if your operations are stateless then you are only checkpointing the kafka offsets, you are right. The problem is that you are also checkpointing metadata – including the actual Code and serialized java classes – that’s why you’ll see

Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
, 2015 at 2:05 PM To: Adrian Tanase Subject: Re: Using Spark for portfolio manager app Hi Adrian, Thanks Cassandra seems to be good candidate too. I will give it a try. Do you know any stable connector that help Spark work with Cassandra? Or I should write it myself. Regards my second question, i

Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful as Hbase, a lot easier to setup and manage. Well suited for this type of usecase, with a combination of K/V store and time series data. For the second question, I’ve used this pattern all the time for “flash

Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
1) yes, just use .repartition on the inbound stream, this will shuffle data across your whole cluster and process in parallel as specified. 2) yes, although I’m not sure how to do it for a totally custom receiver. Does this help as a starting point?

Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-24 Thread Adrian Tanase
RE: # because I already have a bunch of InputSplits, do I still need to specify the number of executors to get processing parallelized? I would say it’s best practice to have as many executors as data nodes and as many cores as you can get from the cluster – if YARN has enough resources it

Re: Spark on YARN / aws - executor lost on node restart

2015-09-24 Thread Adrian Tanase
Closing the loop, I’ve submitted this issue – TD, cc-ing you since it’s spark streaming, not sure who oversees the Yarn module. https://issues.apache.org/jira/browse/SPARK-10792 -adrian From: Adrian Tanase Date: Friday, September 18, 2015 at 6:18 PM To: "user@spark.apache.org<mai

Re: How to make Group By/reduceByKey more efficient?

2015-09-24 Thread Adrian Tanase
All the *ByKey aggregations perform an efficient shuffle and preserve partitioning on the output. If all you need is to call reduceByKey, then don’t bother with groupBy. You should use groupBy if you really need all the datapoints from a key for a very custom operation. From the docs: Note:

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-24 Thread Adrian Tanase
+1 on grouping the case classes and creating a hierarchy – as long as you use the data programatically. For DataFrames / SQL the other ideas probably scale better… From: Ted Yu Date: Wednesday, September 23, 2015 at 7:07 AM To: satish chandra j Cc: user Subject: Re: Scala Limitation - Case

Re: reduceByKeyAndWindow confusion

2015-09-24 Thread Adrian Tanase
Let me take a stab at your questions – can you clarify some of the points below? I’m wondering if you’re using the streaming concepts as they were intended… 1. Windowed operations First, I just want to confirm that it is your intention to split the original kafka stream into multiple Dstreams

Re: reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread Adrian Tanase
The 2 operations can't be used inside one another. If you need something like an all time average then you need to keep a tuple (sum, count) to which you add all the new values that come in every batch. The average is then just a map on the state DStream. Makes sense? have I guessed your use

Re: Deploying spark-streaming application on production

2015-09-22 Thread Adrian Tanase
btw I re-read the docs and I want to clarify that reliable receiver + WAL gives you at least once, not exactly once semantics. Sent from my iPhone On 21 Sep 2015, at 21:50, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: I'm wondering, isn't this the canoni

Re: Spark Streaming distributed job

2015-09-22 Thread Adrian Tanase
I think you need to dig into the custom receiver implementation. As long as the source is distributed and partitioned, the downstream .map, .foreachXX are all distributed as you would expect. You could look at how the “classic” Kafka receiver is instantiated in the streaming guide and try to

Re: Invalid checkpoint url

2015-09-22 Thread Adrian Tanase
Have you tried simply ssc.checkpoint("checkpoint”)? This should create it in the local folder, has always worked for me when in development on local mode. For the others (/tmp/..) make sure you have rights to write there. -adrian From: srungarapu vamsi Date: Tuesday, September 22, 2015 at 7:59

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Adrian Tanase
tion is, how does one sort lines in a file by line number. On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase <atan...@adobe.com<mailto:atan...@adobe.com>> wrote: By looking through the docs and source code, I think you can get away with rdd.zipWithIndex to get the index of each line in

Re: Using Spark for portfolio manager app

2015-09-21 Thread Adrian Tanase
1. reading from kafka has exactly once guarantees - we are using it in production today (with the direct receiver) * ​you will probably have 2 topics, loading both into spark and joining / unioning as needed is not an issue * tons of optimizations you can do there, assuming

Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Adrian Tanase
We do - using Spark streaming, Kafka, HDFS all collocated on the same nodes. Works great so far. Spark picks up the location information and reads data from the partitions hosted by the local broker, showing up as NODE_LOCAL in the UI. You also need to look at the locality options in the

Re: What's the best practice to parse JSON using spark

2015-09-21 Thread Adrian Tanase
I've been using spray-json for general JSON ser/deser in scala (spark app), mostly for config files and data exchange. Haven't used it in conjunction with jobs that process large JSON data sources, so can't speak for those use cases. -adrian

Re: Deploying spark-streaming application on production

2015-09-21 Thread Adrian Tanase
I'm wondering, isn't this the canonical use case for WAL + reliable receiver? As far as I know you can tune Mqtt server to wait for ack on messages (qos level 2?). With some support from the client libray you could achieve exactly once semantics on the read side, if you ack message only after

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Reading through the docs it seems that with a combination of FAIR scheduler and maybe pools you can get pretty far. However the smallest unit of scheduled work is the task so probably you need to think about the parallelism of each transformation. I'm guessing that by increasing the level of

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Forgot to mention that you could also restrict the parallelism to 4, essentially using only 4 cores at any given time, however if your job is complex, a stage might be broken into more than 1 task... Sent from my iPhone On 19 Sep 2015, at 08:30, Adrian Tanase <atan...@adobe.com<mailt

Re: Using Spark for portfolio manager app

2015-09-18 Thread Adrian Tanase
Cool use case! You should definitely be able to model it with Spark. For the first question it's pretty easy - you probably need to keep the user portfolios as state using updateStateByKey. You need to consume 2 event sources - user trades and stock changes. You probably want to Cogroup the

Re: Spark on YARN / aws - executor lost on node restart

2015-09-18 Thread Adrian Tanase
dies completely? If there are no ideas on the list, I’ll prepare some logs and follow up with an issue. Thanks, -adrian From: Adrian Tanase Date: Wednesday, September 16, 2015 at 6:01 PM To: "user@spark.apache.org<mailto:user@spark.apache.org>" Subject: Spark on YARN / aws

Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Adrian Tanase
This section in the streaming guide also outlines a new option – use 2 versions in parallel for a period of time, controlling the draining / transition in the application level. http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code Also – I would not

Re: Saprk.frame.Akkasize

2015-09-17 Thread Adrian Tanase
Have you reviewed this section of the guide? http://spark.apache.org/docs/latest/programming-guide.html#shared-variables If the dataset is static and you need a copy on all the nodes, you should look at broadcast variables. SQL specific, have you tried loading the dataset using the DataFrame

  1   2   >