Re: spark kafka batch integration

2014-12-15 Thread Cody Koeninger
For an alternative take on a similar idea, see https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka An advantage of the approach I'm taking is that the lower and upper offsets of the RDD are known in advance, so it's deterministic. I

Re: KafkaUtils explicit acks

2014-12-16 Thread Cody Koeninger
Do you actually need spark streaming per se for your use case? If you're just trying to read data out of kafka into hbase, would something like this non-streaming rdd work for you: https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka Note

Re: Playing along at home: recommendations as to system requirements?

2014-12-27 Thread Cody Koeninger
There are hardware recommendations at http://spark.apache.org/docs/latest/hardware-provisioning.html but they're overkill for just testing things out. You should be able to get meaningful work done with 2 m3large for instance. On Sat, Dec 27, 2014 at 8:27 AM, Amy Brown testingwithf...@gmail.com

Re: How to collect() each partition in scala ?

2014-12-30 Thread Cody Koeninger
I'm not sure exactly what you're trying to do, but take a look at rdd.toLocalIterator if you haven't already. On Tue, Dec 30, 2014 at 6:16 AM, Sean Owen so...@cloudera.com wrote: collect()-ing a partition still implies copying it to the driver, but you're suggesting you can't collect() the

Re: How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread Cody Koeninger
Take a look at the implementation linked from here https://issues.apache.org/jira/browse/SPARK-4964 see if that would meet your needs On Wed, Jan 14, 2015 at 9:58 PM, mykidong mykid...@gmail.com wrote: Hi, My Spark Streaming Job is doing like kafka etl to HDFS. For instance, every 10 min.

Re: Is there Spark's equivalent for Storm's ShellBolt?

2015-01-14 Thread Cody Koeninger
Look at the method pipe http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD On Wed, Jan 14, 2015 at 11:16 PM, umanga bistauma...@gmail.com wrote: This is question i originally asked in Quora: http://qr.ae/6qjoI We have some code written in C++ and Python that

Re: Kafka Version Update 0.8.2 status?

2015-02-10 Thread Cody Koeninger
That PR hasn't been updated since the new kafka streaming stuff (including KafkaCluster) got merged to master, it will require more changes than what's in there currently. On Tue, Feb 10, 2015 at 9:25 AM, Sean Owen so...@cloudera.com wrote: Yes, did you see the PR for SPARK-2808?

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext

Re: Streaming scheduling delay

2015-02-12 Thread Cody Koeninger
outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-19 Thread Cody Koeninger
, 'no upper bound' (-1 didn't work). On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definition of JdbcRDD.create: def create[T]( sc: JavaSparkContext, connectionFactory: ConnectionFactory, sql: String, lowerBound: Long

Re: Spark Streaming and message ordering

2015-02-19 Thread Cody Koeninger
Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there

Re: Spark Streaming and message ordering

2015-02-20 Thread Cody Koeninger
batch? To compare with storm from a message ordering point of view, unless a tuple is fully processed by the DAG (as defined by spout+bolts), the next tuple does not enter the DAG. On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote: Kafka ordering is guaranteed on a per

Re: different akka versions and spark

2015-01-05 Thread Cody Koeninger
I haven't tried it with spark specifically, but I've definitely run into problems trying to depend on multiple versions of akka in one project. On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com wrote: hey Ted, i am aware of the upgrade efforts for akka. however if spark 1.2

Re: Reading from a centralized stored

2015-01-05 Thread Cody Koeninger
If you are not co-locating spark executor processes on the same machines where the data is stored, and using an rdd that knows about which node to prefer scheduling a task on, yes, the data will be pulled over the network. Of the options you listed, S3 and DynamoDB cannot have spark running on

Re: Spark response times for queries seem slow

2015-01-05 Thread Cody Koeninger
That sounds slow to me. It looks like your sql query is grouping by a column that isn't in the projections, I'm a little surprised that even works. But you're getting the same time reducing manually? Have you looked at the shuffle amounts in the UI for the job? Are you certain there aren't a

Re: Job priority

2015-01-10 Thread Cody Koeninger
http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties Setting a high weight such as 1000 also makes it possible to implement *priority* between pools—in essence, the weight-1000 pool will always get to launch tasks first whenever it has jobs active. On Sat, Jan 10,

Re: Job priority

2015-01-11 Thread Cody Koeninger
the highest priority. Alex On Sat, Jan 10, 2015 at 10:11 PM, Cody Koeninger c...@koeninger.org javascript:_e(%7B%7D,'cvml','c...@koeninger.org'); wrote: http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties Setting a high weight such as 1000 also makes it possible

Re: Manually trigger RDD map function without action

2015-01-12 Thread Cody Koeninger
If you don't care about the value that your map produced (because you're not already collecting or saving it), then is foreach more appropriate to what you're doing? On Mon, Jan 12, 2015 at 4:08 AM, kevinkim kevin...@apache.org wrote: Hi, answer from another Kevin. I think you may already

Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Cody Koeninger
You should take a look at https://issues.apache.org/jira/browse/SPARK-4122 which is implementing writing to kafka in a pretty similar way (make a new producer inside foreachPartition) On Mon, Jan 12, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote: Leader-not-found suggests a problem with

Re: Spark does not loop through a RDD.map

2015-01-12 Thread Cody Koeninger
At a quick glance, I think you're misunderstanding some basic features. http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations Map is a transformation, it is lazy. You're not calling any action on the result of map. Also, closing over a mutable variable (like idx or

Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/monitoring.html http://spark.apache.org/docs/latest/configuration.html#spark-ui spark.eventLog.enabled On Mon, Jan 12, 2015 at 3:00 PM, ChongTang ct...@virginia.edu wrote: Is there any body can help me with this? Thank you very much! -- View this

Re: How to recovery application running records when I restart Spark master?

2015-01-12 Thread Cody Koeninger
enabled this option, and I saved logs into Hadoop file system. The problem is, how can I get the duration of an application? The attached file is the log I copied from HDFS. On Mon, Jan 12, 2015 at 4:36 PM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest

Re: Reading from a centralized stored

2015-01-06 Thread Cody Koeninger
No, most rdds partition input data appropriately. On Tue, Jan 6, 2015 at 1:41 PM, Franc Carter franc.car...@rozettatech.com wrote: One more question, to be clarify. Will every node pull in all the data ? thanks On Tue, Jan 6, 2015 at 12:56 PM, Cody Koeninger c...@koeninger.org wrote

Re: Why Parquet Predicate Pushdown doesn't work?

2015-01-07 Thread Cody Koeninger
But Xuelin already posted in the original message that the code was using SET spark.sql.parquet.filterPushdown=true On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote: Quoting Michael: Predicate push down into the input format is turned off by default because there is

Re: JdbcRdd for Python

2015-01-05 Thread Cody Koeninger
JavaDataBaseConnectivity is, as far as I know, JVM specific. The JdbcRDD is expecting to deal with Jdbc Connection and ResultSet objects. I haven't done any python development in over a decade, but if someone wants to work together on a python equivalent I'd be happy to help out. The original

Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Cody Koeninger
General ideas regarding too many open files: Make sure ulimit is actually being set, especially if you're on mesos (because of https://issues.apache.org/jira/browse/MESOS-123 ) Find the pid of the executor process, and cat /proc/pid/limits set spark.shuffle.consolidateFiles = true try

Re: Data Locality

2015-01-06 Thread Cody Koeninger
No, not all rdds have location information, and in any case tasks may be scheduled on non-local nodes if there is idle capacity. see spark.locality.wait http://spark.apache.org/docs/latest/configuration.html On Tue, Jan 6, 2015 at 10:17 AM, gtinside gtins...@gmail.com wrote: Does spark

Re: KafkaUtils and specifying a specific partition

2015-03-12 Thread Cody Koeninger
KafkaUtils.createDirectStream, added in spark 1.3, will let you specify a particular topic and partition On Thu, Mar 12, 2015 at 1:07 PM, Colin McQueen colin.mcqu...@shiftenergy.com wrote: Thanks! :) Colin McQueen *Software Developer* On Thu, Mar 12, 2015 at 3:05 PM, Jeffrey Jedele

Re: Spark streaming with Kafka, multiple partitions fail, single partition ok

2015-03-30 Thread Cody Koeninger
This line at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close( KafkaRDD.scala:158) is the attempt to close the underlying kafka simple consumer. We can add a null pointer check, but the underlying issue of the consumer being null probably indicates a problem earlier. Do you see

Re: Scalable JDBCRDD

2015-03-02 Thread Cody Koeninger
Have you already tried using the Vertica hadoop input format with spark? I don't know how it's implemented, but I'd hope that it has some notion of vertica-specific shard locality (which JdbcRDD does not). If you're really constrained to consuming the result set in a single thread, whatever

Re: Scalable JDBCRDD

2015-03-01 Thread Cody Koeninger
I'm a little confused by your comments regarding LIMIT. There's nothing about JdbcRDD that depends on limit. You just need to be able to partition your data in some way such that it has numeric upper and lower bounds. Primary key range scans, not limit, would ordinarily be the best way to do

Re: Spark Streaming and message ordering

2015-02-20 Thread Cody Koeninger
, it breaks our requirement that messages be executed in order within a partition. Thanks! On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org wrote: For a given batch, for a given partition, the messages will be processed in order by the executor that is running that partition

Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Cody Koeninger
Have you tried instantiating the instance inside the closure, rather than outside of it? If that works, you may need to switch to use mapPartition / foreachPartition for efficiency reasons. On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang adelbe...@gmail.com wrote: Is there no way to pull out

Re: Multiple Kafka Recievers

2015-04-13 Thread Cody Koeninger
As far as I know, createStream doesn't let you specify where receivers are run. createDirectStream in 1.3 doesn't use long-running receivers, so it is likely to give you more even distribution of consumers across your workers. On Mon, Apr 13, 2015 at 11:31 AM, Laeeq Ahmed

Re: Spark on Cassandra

2015-04-29 Thread Cody Koeninger
Hadoop version doesn't matter if you're just using cassandra. On Wed, Apr 29, 2015 at 12:08 PM, Matthew Johnson matt.john...@algomi.com wrote: Hi all, I am new to Spark, but excited to use it with our Cassandra cluster. I have read in a few places that Spark can interact directly with

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-30 Thread Cody Koeninger
it is not related to the data ingestion part. On Wed, Apr 29, 2015 at 8:35 PM, Cody Koeninger c...@koeninger.org wrote: Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called

Re: empty jdbc RDD in spark

2015-05-04 Thread Cody Koeninger
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD The arguments are sql string, lower bound, upper bound, number of partitions. Your call SELECT * FROM MEMBERS LIMIT ? OFFSET ?, 0, 100, 1 would thus be run as SELECT * FROM MEMBERS LIMIT 0 OFFSET 100

Re: How to stream all data out of a Kafka topic once, then terminate job?

2015-04-29 Thread Cody Koeninger
The idea of peek vs poll doesn't apply to kafka, because kafka is not a queue. There are two ways of doing what you want, either using KafkaRDD or a direct stream The Kafka rdd approach would require you to find the beginning and ending offsets for each partition. For an example of this, see

Re: Too many open files when using Spark to consume messages from Kafka

2015-04-29 Thread Cody Koeninger
Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs:

Re: How to separate messages of different topics.

2015-05-05 Thread Cody Koeninger
Make sure to read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The directStream / KafkaRDD has a 1 : 1 relationship between kafka topic/partition and spark partition. So a given spark partition only has messages from 1 kafka topic. You can tell what topic that is

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka project chose to use 2 different configuration keys. On

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c

Re: Kafka Direct Approach + Zookeeper

2015-05-13 Thread Cody Koeninger
at 3:44 PM, Cody Koeninger c...@koeninger.org wrote: Either one will work, there is no semantic difference. The reason I designed the direct api to accept both of those keys is because they were used to define lists of brokers in pre-existing Kafka project apis. I don't know why the Kafka

Re: kafka + Spark Streaming with checkPointing fails to restart

2015-05-13 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing shows setting up your stream and calling .checkpoint(checkpointDir) inside the functionToCreateContext. It looks to me like you're setting up your stream and calling checkpoint outside, after getOrCreate. I'm not

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's

Re: Worker Spark Port

2015-05-13 Thread Cody Koeninger
I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote: I understated that this port value is randomly selected. Is there a way to enforce

Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim

Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
they arrived after the driver reconnected to Kafka Is this what happens by default in your suggestion? On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than

Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter

Re: Spark Streaming 1.3 Kafka Direct Streams

2015-04-01 Thread Cody Koeninger
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele...@gmail.com wrote: With receivers, it was pretty obvious which code ran where - each receiver occupied a core and ran on the

Re: Spark Streaming 1.3 Kafka Direct Streams

2015-04-01 Thread Cody Koeninger
once, and no way of refreshing them. Thanks again! On Wed, Apr 1, 2015 at 10:01 AM, Cody Koeninger c...@koeninger.org wrote: https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md The kafka consumers run in the executors. On Wed, Apr 1, 2015 at 11:18 AM, Neelesh neele

Re: Connection pooling in spark jobs

2015-04-02 Thread Cody Koeninger
Connection pools aren't serializable, so you generally need to set them up inside of a closure. Doing that for every item is wasteful, so you typically want to use mapPartitions or foreachPartition rdd.mapPartition { part = setupPool part.map { ... See Design Patterns for using foreachRDD in

Re: Spark Streaming 1.3 Kafka Direct Streams

2015-04-01 Thread Cody Koeninger
at 11:21 AM, Cody Koeninger c...@koeninger.org wrote: If you want to change topics from batch to batch, you can always just create a KafkaRDD repeatedly. The streaming code as it stands assumes a consistent set of topics though. The implementation is private so you cant subclass it without

Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Cody Koeninger
the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single

Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
PM, Bill Jay bill.jaypeter...@gmail.com wrote: If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will it still start to consume the data that were produced to Kafka at 12:01? Or it will just start consuming from the current time? On Tue, May 19, 2015 at 10:58 AM, Cody

Re: Spark Streaming Stuck After 10mins Issue...

2015-06-07 Thread Cody Koeninger
What is the code used to set up the kafka stream? On Sat, Jun 6, 2015 at 3:23 PM, EH eas...@gmail.com wrote: And here is the Thread Dump, where seems every worker is waiting for Executor #6 Thread 95: sparkExecutor-akka.actor.default-dispatcher-22 (RUNNABLE) to be complete: Thread 41:

Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Cody Koeninger
[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org: I think

Re: Behavior of the spark.streaming.kafka.maxRatePerPartition config param?

2015-06-03 Thread Cody Koeninger
The default of 0 means no limit. Each batch will grab as much as is available, ie a range of offsets spanning from the end of the previous batch to the highest available offsets on the leader. If you set spark.streaming.kafka.maxRatePerPartition 0, the number you set is the maximum number of

Re: Kafka createDirectStream ​issue

2015-06-23 Thread Cody Koeninger
The exception $line49 is referring to a line of the spark shell. Have you tried it from an actual assembled job with spark-submit ? On Tue, Jun 23, 2015 at 3:48 PM, syepes sye...@gmail.com wrote: Hello, I ​am trying ​use the new Kafka ​consumer ​​KafkaUtils.createDirectStream​ but I am

Re: RE: Spark or Storm

2015-06-18 Thread Cody Koeninger
That general description is accurate, but not really a specific issue of the direct steam. It applies to anything consuming from kafka (or, as Matei already said, any streaming system really). You can't have exactly once semantics, unless you know something more about how you're storing results.

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri

Re: RE: Spark or Storm

2015-06-19 Thread Cody Koeninger
...@163.com *From:* Haopu Wang hw...@qilinsoft.com *Date:* 2015-06-19 18:47 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org; bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs wrbri

Re: RE: Spark or Storm

2015-06-19 Thread Cody Koeninger
:* Haopu Wang hw...@qilinsoft.com *Date:* 2015-06-19 18:47 *To:* Enno Shioji eshi...@gmail.com; Tathagata Das t...@databricks.com *CC:* prajod.vettiyat...@wipro.com; Cody Koeninger c...@koeninger.org; bit1...@163.com; Jordan Pilat jrpi...@gmail.com; Will Briggs wrbri...@gmail.com; Ashish Soni

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
the job created for the specific batch, but the subsequent batches still proceed, isn’t it right ? And question still remains, how to keep track of those failed batches ? From: amit assudani aassud...@impetus.com Date: Friday, June 26, 2015 at 11:21 AM To: Cody Koeninger c...@koeninger.org Cc

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
If you're consistently throwing exceptions and thus failing tasks, once you reach max failures the whole stream will stop. It's up to you to either catch those exceptions, or restart your stream appropriately once it stops. Keep in mind that if you're relying on checkpoints, and fixing the error

Re: How to recover in case user errors in streaming

2015-06-26 Thread Cody Koeninger
the connectivity issues to persistent store which gets resolved in a while, but how do I know which all messages failed and need rework ? Regards, Amit From: Cody Koeninger c...@koeninger.org Date: Friday, June 26, 2015 at 11:16 AM To: amit assudani aassud...@impetus.com Cc: user

Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store offsets. If you want finer-grained control over offsets, you can update the values in zookeeper yourself before starting the job. createDirectStream in spark 1.3 is still marked as experimental, and subject to change.

Re: spark streaming job fails to restart after checkpointing due to DStream initialization errors

2015-06-26 Thread Cody Koeninger
Make sure you're following the docs regarding setting up a streaming checkpoint. Post your code if you can't get it figured out. On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam ashnigamt...@gmail.com wrote: I bring up spark streaming job that uses Kafka as input source. No data to process and

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive

Re: spark streaming with kafka reset offset

2015-06-26 Thread Cody Koeninger
per second.Each event is of ~500bytes. Having 5 receivers with 60 partitions each receiver is sufficient for spark streaming to consume ? On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger c...@koeninger.org javascript:_e(%7B%7D,'cvml','c...@koeninger.org'); wrote: The receiver-based kafka

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
there ... but as long as you're making sure the setup gets called at most once per executor, before the work that needs it ... should be ok. On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 algermissen1...@icloud.com wrote: On 12 Jun 2015, at 22:59, Cody Koeninger c...@koeninger.org wrote: Close

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
Close. the mapPartitions call doesn't need to do anything at all to the iter. mapPartitions { iter = SomeDb.conn.init iter } On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 algermissen1...@icloud.com wrote: Cody, On 12 Jun 2015, at 17:26, Cody Koeninger c...@koeninger.org wrote

Re: How to use Window Operations with kafka Direct-API?

2015-06-12 Thread Cody Koeninger
of whether you're using the createStream or createDirectStream api. On Fri, Jun 12, 2015 at 9:14 AM, Cody Koeninger c...@koeninger.org wrote: Casting to HasOffsetRanges would be meaningless anyway if done after an operation that changes partitioning. You can still use the messageHandler argument

Re: Spark Streaming, updateStateByKey and mapPartitions() - and lazy DatabaseConnection

2015-06-12 Thread Cody Koeninger
There are several database apis that use a thread local or singleton reference to a connection pool (we use ScalikeJDBC currently, but there are others). You can use mapPartitions earlier in the chain to make sure the connection pool is set up on that executor, then use it inside updateStateByKey

Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Cody Koeninger
The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Cody Koeninger
direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: set the storage policy for the DStream RDDs to MEMORY AND DISK - it appears the

Re: How to monitor Spark Streaming from Kafka?

2015-06-01 Thread Cody Koeninger
KafkaCluster.scala in the spark/extrernal/kafka project has a bunch of api code, including code for updating Kafka-managed ZK offsets. Look at setConsumerOffsets. Unfortunately all of that code is private, but you can either write your own, copy it, or do what I do (sed out private[spark] and

Re: spark streaming with kafka reset offset

2015-06-29 Thread Cody Koeninger
=d5UJonrruHklist=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6index=4 ] from last Spark Summit 2015). But that approach can give duplicate records. The direct approach gives exactly-once guarantees, so you should try it out. TD On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger c...@koeninger.org wrote

Re: writing to kafka using spark streaming

2015-07-06 Thread Cody Koeninger
Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since

Re: Restarting Spark Streaming Application with new code

2015-07-06 Thread Cody Koeninger
You shouldn't rely on being able to restart from a checkpoint after changing code, regardless of whether the change was explicitly related to serialization. If you are relying on checkpoints to hold state, specifically which offsets have been processed, that state will be lost if you can't

Re: spark streaming with kafka reset offset

2015-06-30 Thread Cody Koeninger
:46 PM, Cody Koeninger c...@koeninger.org wrote: Read the spark streaming guide ad the kafka integration guide for a better understanding of how the receiver based stream works. Capacity planning is specific to your environment and what the job is actually doing, youll need to determine

Re: spark-streaming-kafka_2.11 not available yet?

2015-05-26 Thread Cody Koeninger
It's being added in 1.4 https://repository.apache.org/content/repositories/orgapachespark-1104/org/apache/spark/spark-streaming-kafka_2.11/1.4.0-rc2/ On Tue, May 26, 2015 at 3:14 AM, Petr Novak oss.mli...@gmail.com wrote: Hello, I would like to switch from Scala 2.10 to 2.11 for Spark app

Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Cody Koeninger
I just verified that the following code works on 1.3.0 : val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic1) val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
Have you read https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ? 1. There's nothing preventing that. 2. Checkpointing will give you at-least-once semantics, provided you have sufficient kafka retention. Be aware that checkpoints aren't recoverable if you upgrade code.

Re: avoid duplicate due to executor failure in spark stream

2015-08-12 Thread Cody Koeninger
will just ignore already processed events by accessing counter of failed task. Is it directly possible to access accumulator per task basis without writing to hdfs or hbase. On Tue, Aug 11, 2015 at 3:15 AM, Cody Koeninger c...@koeninger.org wrote: http://spark.apache.org/docs/latest

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Cody Koeninger
instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote: I'm not aware of an existing api per se, but you could create your own subclass of the DStream that returns None for compute() under certain conditions. On Wed, Aug 12, 2015 at 1:03 PM

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Cody Koeninger
, Shushant Arora shushantaror...@gmail.com wrote: But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c

Re: spark streaming 1.3 kafka error

2015-08-21 Thread Cody Koeninger
Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver

Re: How to parse multiple event types using Kafka

2015-08-23 Thread Cody Koeninger
Each spark partition will contain messages only from a single kafka topcipartition. Use hasOffsetRanges to tell which kafka topicpartition it's from. See the docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast

Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread Cody Koeninger
It doesn't matter if shuffling occurs. Just update ZK from the driver, inside the foreachRDD, after all your dynamodb updates are done. Since you're just doing it for monitoring purposes, that should be fine. On Mon, Aug 24, 2015 at 12:11 PM, suchenzang suchenz...@gmail.com wrote: Forgot to

Re: Kafka Spark Partition Mapping

2015-08-20 Thread Cody Koeninger
In general you cannot guarantee which node an RDD will be processed on. The preferred location for a kafkardd is the kafka leader for that partition, if they're deployed on the same machine. If you want to try to override that behavior, the method is getPreferredLocations But even in that case,

Re: spark kafka partitioning

2015-08-20 Thread Cody Koeninger
I'm not clear on your question, can you rephrase it? Also, are you talking about createStream or createDirectStream? On Thu, Aug 20, 2015 at 9:48 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-18 Thread Cody Koeninger
[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones

Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-18 Thread Cody Koeninger
looking forward to it :) On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non

  1   2   3   4   5   6   7   >