Re: [Structured Streaming] Avoiding multiple streaming queries

2018-02-14 Thread Tathagata Das
Of course, you can write to multiple Kafka topics from a single query. If your dataframe that you want to write has a column named "topic" (along with "key", and "value" columns), it will write the contents of a row to the topic in that row. This automatically works. So the only thing you need to f

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey, Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this. TD On Tue, Feb 20, 2018 at 8:20 PM, kant kodali wrote: > if I change it to the below code it works. However, I don't believe it is > the solution I am looking for. I want to be able t

Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-25 Thread Tathagata Das
The continuous one is our new low latency continuous processing engine in Structured Streaming (to be released in 2.3). Here is the pre-release doc - https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing On Sun, Feb 25,

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Tathagata Das
Unfortunately, exposing Kafka headers is not yet supported in Structured Streaming. The community is more than welcome to add support for it :) On Tue, Feb 27, 2018 at 2:51 PM, Karthik Jayaraman wrote: > Hi all, > > I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I > wou

Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine that an event is late. We simply track the maximum event time the engine has seen in the data it has processed till now. And any data that has event time less than the max is basically "late" (as it is out-of-order). Now, in

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-02-28 Thread Tathagata Das
There is no good way to save to parquet without causing downstream consistency issues. You could use foreachRDD to get each RDD, convert it to DataFrame/Dataset, and write out as parquet files. But you will later run into issues with partial files caused by failures, etc. On Wed, Feb 28, 2018 at

Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-28 Thread Tathagata Das
I made a JIRA for it - https://issues.apache.org/jira/browse/SPARK-23539 Unfortunately it is blocked by Kafka version upgrade, which has a few nasty issues related to Kafka bugs - https://issues.apache.org/jira/browse/SPARK-18057 On Wed, Feb 28, 2018 at 3:17 PM, karthikus wrote: > TD, > > Thanks

Re: [Beginner] How to save Kafka Dstream data to parquet ?

2018-03-02 Thread Tathagata Das
way we get at least > once semantic and partial file write issue. > > Thoughts ? > > > Sunil Parmar > > On Wed, Feb 28, 2018 at 1:59 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> There is no good way to save to parquet without causing downstre

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-06 Thread Tathagata Das
Which version of Spark are you using? And can you give us the full stack trace of the exception? On Tue, Mar 6, 2018 at 1:53 AM, Junfeng Chen wrote: > I am trying to read kafka and save the data as parquet file on hdfs > according to this https://stackoverflow.com/questions/45827664/read-from >

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
-distribution.sh --name custom-spark --pip --r --tgz -Psparkr > -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn > > > On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hey, >> >> Thanks for testing ou

Re: CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread warning

2018-03-07 Thread Tathagata Das
ny this line: > >> CachedKafkaConsumer: CachedKafkaConsumer is not running in >> UninterruptibleThread. It may hang when CachedKafkaConsumer's method are >> interrupted because of KAFKA-1894. > > > > Regard, > Junfeng Chen > > On Wed, Mar 7, 2018 at 3:34 AM, Tathagata Das

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
d is >>>> it two implement both of these? It turns out the update mode and full outer >>>> join is very useful and required in my case, therefore, I'm just asking. >>>> >>>> Thanks! >>>> >>>> On Tue, Mar 6, 2018 at

Re: Upgrades of streaming jobs

2018-03-09 Thread Tathagata Das
Yes, all checkpoints are forward compatible. However, you do need to restart the query if you want to update the code of the query. This downtime can be in less than a second (if you just restart the query without stopping the application/Spark driver) or 10s of seconds (if you have to stop the ap

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
I believe just spitting out nulls for every > trigger until there is a match and when there is match spitting out the > joined rows should suffice isn't it? > > Sorry if my thoughts are too naive! > > > > > > > > > > > On Thu, Mar 8, 2018 at 6:14 PM, Tat

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html This is true stream-stream join which will automatically buffer delayed data and appropriately join stuff with SQL join semantics. Please check it out :) TD On Wed, Mar 14, 2018 at 12:07

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
-with-tathagata-das On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris wrote: > You need to tell Spark about the structure of the data, it doesn't know > ahead of time if you put avro, json, protobuf, etc. in kafka for the > message format. If the messages are in json, Spark provides

Re: [Structured Streaming] Application Updates in Production

2018-03-21 Thread Tathagata Das
Why do you want to start the new code in parallel to the old one? Why not stop the old one, and then start the new one? Structured Streaming ensures that all checkpoint information (offsets and state) are future-compatible (as long as state schema is unchanged), hence new code should be able to pic

Re: [Structured Streaming] Application Updates in Production

2018-03-22 Thread Tathagata Das
om the old > checkpoints, one would need to keep the old app and the new app running > until new app catches up on data processing with the old app. > > > - Original message - > From: Tathagata Das > To: Priyank Shrivastava > Cc: user > Subject: Re: [Structured

Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint directory that you provide. And when you start the query again with the same directory it will just pick up where it left off. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failur

Re: Does structured streaming support Spark Kafka Direct?

2018-04-12 Thread Tathagata Das
The parallelism is same for Structured Streaming. In fact, the Kafka Structured Streaming source is based on the same principle as DStream's Kafka Direct, hence it has very similar behavior. On Tue, Apr 10, 2018 at 11:03 PM, SRK wrote: > hi, > > We have code based on Spark Kafka Direct in produ

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Stre

Re: Nullpointerexception error when in repartition

2018-04-12 Thread Tathagata Das
.start() > > > But what i need to do in this step is only transforming json string data > to Dataset . How to fix it? > > Thanks! > > > Regard, > Junfeng Chen > > On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: >

Re: Does partition by and order by works only in stateful case?

2018-04-12 Thread Tathagata Das
The traditional SQL windows with `over` is not supported in streaming. Only time-based windows, that is, `window("timestamp", "10 minutes")` is supported in streaming. On Thu, Apr 12, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > Does partition by and order by works only in stateful case? >

Re: Structured Streaming on Kubernetes

2018-04-13 Thread Tathagata Das
Structured streaming is stable in production! At Databricks, we and our customers collectively process almost 100s of billions of records per day using SS. However, we are not using kubernetes :) Though I don't think it will matter too much as long as kubes are correctly provisioned+configured and

Re: can we use mapGroupsWithState in raw sql?

2018-04-16 Thread Tathagata Das
Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > can we use mapGroupsWithSt

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Tathagata Das
Just to be clear, these screenshots are about the memory consumption of the driver. So this is nothing to do with streaming aggregation state which are kept in the memory of the executors, not the driver. On Tue, May 22, 2018 at 10:21 AM, Jungtaek Lim wrote: > 1. Could you share your Spark versi

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Try to define the watermark on the right column immediately before calling `groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark and then doing a bunch of opaque transformation (user-defined flatMap that the planner has no visibility into). This prevents the planner from propaga

Re: Reset the offsets, Kafka 0.10 and Spark

2018-06-08 Thread Tathagata Das
Structured Streaming really makes this easy. You can simply specify the option of whether the start the query from earliest or latest. Check out - https://www.slideshare.net/databricks/a-deep-dive-into-structured-streaming - https://spark.apache.org/docs/latest/structured-streaming-kafka-integratio

Re: Spark can't identify the event time column being supplied to withWatermark()

2018-06-08 Thread Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls. And there is no easy way to get around it. If you can, let us know how your experience with mapGroupsWithState has been. TD On Fri, Jun 8, 2018 at 1:49 PM, frankdede wrote: > You are exactly right! A few hours ago, I trie

Re: [structured-streaming][parquet] readStream files order in Parquet

2018-06-15 Thread Tathagata Das
The files are processed in the order the file last modified timestamp. The path and partitioning scheme are not used for ordering. On Thu, Jun 14, 2018 at 6:59 AM, karthikjay wrote: > My parquet files are first partitioned by environment and then by date > like: > > env=testing/ >date=2018-

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
Structured Streaming does not maintain a queue of batch like DStream. DStreams used to cut off batches at a fixed interval and put in a queue, and a different thread processed queued batches. In contrast, Structured Streaming simply cuts off and immediately processes a batch after the previous batc

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-20 Thread Tathagata Das
-streaming-queries On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das wrote: > Structured Streaming does not maintain a queue of batch like DStream. > DStreams used to cut off batches at a fixed interval and put in a queue, > and a different thread processed queued batches. In contrast, S

Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Tathagata Das
Actually, we do not support jdbc sink yet. The blog post was just an example :) I agree it is misleading in hindsight. On Wed, Jun 20, 2018 at 6:09 PM, kant kodali wrote: > Hi All, > > Does Spark Structured Streaming have a JDBC sink or Do I need to use > ForEachWriter? I see the following code

Re: Recommendation of using StreamSinkProvider for a custom KairosDB Sink

2018-06-25 Thread Tathagata Das
This is interface is actually unstable. The v2 of DataSource APIs is being designed right now which will be public and stable in a release or two. So unfortunately there is no stable interface right now that I can officially recommend. That said, you could always use the ForeachWriter interface (s

Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-06-28 Thread Tathagata Das
The fundamental conceptual difference between the windowing in DStream vs Structured Streaming is that DStream used the arrival time of the record in Spark (aka processing time) and Structured Streaming using event time. If you want to exactly replicate DStream's processing time windows in Structur

Re: Lag and queued up batches info in Structured Streaming UI

2018-06-28 Thread Tathagata Das
; Its all documented - https://spark.apache.org/docs/ >> latest/structured-streaming-programming-guide.html#monitorin >> g-streaming-queries >> >> On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Str

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread Tathagata Das
Hey all, In Spark 2.4.0, there will be a new feature called *foreachBatch* which will expose the output rows of every micro-batch as a dataframe, on which you apply a user-defined function. With that, you can reuse existing batch sources for writing results as well as write results to multiple loc

Re: [Structured Streaming] Reading Checkpoint data

2018-07-09 Thread Tathagata Das
Only the stream metadata (e.g., streamid, offsets) are stored as json. The stream state data is stored in an internal binary format. On Mon, Jul 9, 2018 at 4:07 PM, subramgr wrote: > Hi, > > I read somewhere that with Structured Streaming all the checkpoint data is > more readable (Json) like. I

Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Tathagata Das
Note that this is not public API yet. Hence this is not very documented. So use it at your own risk :) On Tue, Jul 10, 2018 at 11:04 AM, subramgr wrote: > Hi, > > This looks very daunting *trait* is there some blog post or some articles > which explains on how to implement this *trait* > > Thank

Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Tathagata Das
Yes. Yes you can. On Tue, Jul 17, 2018 at 11:42 AM, Sathi Chowdhury wrote: > Hi, > My question is about ability to integrate spark streaming with multiple > clusters.Is it a supported use case. An example of that is that two topics > owned by different group and they have their own kakka infra .

Re: Exceptions with simplest Structured Streaming example

2018-07-26 Thread Tathagata Das
Unfortunately, your output is not visible in the email that we see. Was it an image that some got removed? Maybe best to copy the output text (i.e. the error message) into the email. On Thu, Jul 26, 2018 at 5:41 AM, Jonathan Apple wrote: > Hello, > > There is a streaming World Count example at t

Re: How to read json data from kafka and store to hdfs with spark structued streaming?

2018-07-26 Thread Tathagata Das
Are you writing multiple streaming query output to the same location? If so, I can see this error occurring. Multiple streaming queries writing to the same directory is not supported. On Tue, Jul 24, 2018 at 3:38 PM, dddaaa wrote: > I'm trying to read json messages from kafka and store them in h

Re: [Structured Streaming] Two watermarks and StreamingQueryListener

2018-08-10 Thread Tathagata Das
Structured Streaming internally maintains one global watermark by taking a min of the two watermarks. Thats why one gets reported. In Spark 2.4, there will be the option of choosing max instead of min. Just curious. Why do you have to two watermarks? Whats the query like. TD On Thu, Aug 9, 2018

Re: Iterator of KeyValueGroupedDataset.flatMapGroupsWithState function

2018-10-31 Thread Tathagata Das
It is okay to collect the iterator. That will not break Spark. However, collecting it requires memory in the executor, so you may cause OOMs if a group has a LOT of new data. On Wed, Oct 31, 2018 at 3:44 AM Antonio Murgia - antonio.murg...@studio.unibo.it wrote: > Hi all, > > I'm currently devel

Re: How to execute non-timestamp-based aggregations in spark structured streaming?

2019-04-22 Thread Tathagata Das
; select row_number() > over (partition by Origin order by OnTimeDepPct desc) OnTimeDepRank,* > from flights > > This will *not* work in *structured streaming* : The culprit is: > > partition by Origin > > The requirement is to use a timestamp-typed field such as > >

Re: Announcing Delta Lake 0.2.0

2019-06-21 Thread Tathagata Das
@ayan guha @Gourav Sengupta Delta Lake is OSS currently does not support defining tables in Hive metastore using DDL commands. We are hoping to add the necessary compatibility fixes in Apache Spark to make Delta Lake work with tables and DDL commands. So we will support them in a future release.

Announcing Delta Lake 0.3.0

2019-08-01 Thread Tathagata Das
Hello everyone, We are excited to announce the availability of Delta Lake 0.3.0 which introduces new programmatic APIs for manipulating and managing data in Delta Lake tables. Here are the main features: - Scala/Java APIs for DML commands - You can now modify data in Delta Lake tables

Re: Structured Streaming Dataframe Size

2019-08-27 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts *Note that Structured Streaming does not materialize the entire table*. It > reads the latest available data from the streaming data source, processes > it incrementally to update the result, and then d

Re: Structured Streaming Dataframe Size

2019-08-29 Thread Tathagata Das
) * Then *select * from **myAggTable* This will give awesome ACID transactional guarantees between reads and writes. Read more on the linked website (full disclosure, I work on that project as well). > Thank you very much for your help! > > > On Tue, Aug 27, 2019, 6:42 PM Tath

Re: Structured Streaming: How to add a listener for when a batch is complete

2019-09-03 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Sep 3, 2019, 3:26 PM Natalie Ruiz wrote: > Hello all, > > > > I’m a beginner, new to Spark and wanted to know if there was an equivalent > to Spark

Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread Tathagata Das
You can use *from_json* built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: > hi,all : > I'm using Structured

Re: Spark Streaming: Aggregating values across batches

2020-02-27 Thread Tathagata Das
Use Structured Streaming. Its aggregation, by definition, is across batches. On Thu, Feb 27, 2020 at 3:17 PM Something Something < mailinglist...@gmail.com> wrote: > We've a Spark Streaming job that calculates some values in each batch. > What we need to do now is aggregate values across ALL batc

Re: dropDuplicates and watermark in structured streaming

2020-02-27 Thread Tathagata Das
1. Yes. All times in event time, not processing time. So you may get 10AM event time data at 11AM processing time, but it will still be compared again all data within 9-10AM event times. 2. Show us your code. On Thu, Feb 27, 2020 at 2:30 AM lec ssmi wrote: > Hi: > I'm new to structured stre

Re: dropDuplicates and watermark in structured streaming

2020-02-28 Thread Tathagata Das
lec ssmi wrote: > Such as : > df.withWarmark("time","window > size").dropDulplicates("id").withWatermark("time","real > watermark").groupBy(window("time","window size","window > size")).agg(count(&

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably bet

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-28 Thread Tathagata Das
as when executing a simple group by. > > Regards, > > Bryan Jeffrey > > Get Outlook for Android <https://aka.ms/ghei36> > > -- > *From:* Tathagata Das > *Sent:* Friday, February 28, 2020 4:56:07 PM > *To:* Bryan Jeffrey > *Cc:* user &

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

2020-03-04 Thread Tathagata Das
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. See the timeout behavior details here - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.GroupState On Wed, Mar 4, 2020 at 2:51 PM Somet

Re: Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Tathagata Das
Have you looked at the suggestion made by the error by searching for "Structured Streaming + Kafka Integration Guide" in Google? It should be the first result. The last section in the "Structured Streaming

Re: how can i write spark addListener metric to kafka

2020-06-09 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis On Tue, Jun 9, 2020 at 4:42 PM a s wrote: > hi Guys, > > I am building a structured streaming app for google analytics data > > i want to capture the numbe

Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Tathagata Das
Hello Rachana, Getting exactly-once semantics on files and making it scale to a very large number of files are very hard problems to solve. While Structured Streaming + built-in file sink solves the exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling in

Re: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Tathagata Das
This is not normal. Its a huge scheduling delay!! Can you tell me more about the application? - cluser setup, number of receivers, whats the computation, etc. On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab wrote: > Hate to do this...but...erm...bump? Would really appreciate input from > others us

Re: How to make spark partition sticky, i.e. stay with node?

2015-01-23 Thread Tathagata Das
Hello mingyu, That is a reasonable way of doing this. Spark Streaming natively does not support sticky because Spark launches tasks based on data locality. If there is no locality (example reduce tasks can run anywhere), location is randomly assigned. So the cogroup or join introduces a locality a

Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Tathagata Das
Ohhh nice! Would be great if you can share us some code soon. It is indeed a very complicated problem and there is probably no single solution that fits all usecases. So having one way of doing things would be a great reference. Looking forward to that! On Wed, Jan 28, 2015 at 4:52 PM, Tobias Pfei

Re: Error reporting/collecting for users

2015-01-28 Thread Tathagata Das
You could use foreachRDD to do the operations and then inside the foreach create an accumulator to gather all the errors together dstream.foreachRDD { rdd => val accumulator = new Accumulator[] rdd.map { . }.count // whatever operation that is error prone // gather all errors

Re: Build error

2015-01-30 Thread Tathagata Das
That is a known issue uncovered last week. It fails on certain environments, not on Jenkins which is our testing environment. There is already a PR up to fix it. For now you can build using "mvn package -DskipTests" TD On Fri, Jan 30, 2015 at 8:59 PM, Andrew Musselman < andrew.mussel...@gmail.com>

Re: how to send JavaDStream RDD using foreachRDD using Java

2015-02-02 Thread Tathagata Das
Hello Sachin, While Akhil's solution is correct, this is not sufficient for your usecase. RDD.foreach (that Akhil is using) will run on the workers, but you are creating the Producer object on the driver. This will not work, a producer create on the driver cannot be used from the worker/executor.

Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Tathagata Das
This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at

Re: StreamingContext getOrCreate with queueStream

2015-02-05 Thread Tathagata Das
I dont think your screenshots came through in the email. None the less, queueStream will not work with getOrCreate. Its mainly for testing (by generating your own RDDs) and not really useful for production usage (where you really need to checkpoint-based recovery). TD On Thu, Feb 5, 2015 at 4:12

Re: Concurrent batch processing

2015-02-12 Thread Tathagata Das
So you have come across spark.streaming.concurrentJobs already :) Yeah, that is an undocumented feature that does allow multiple output operations to submitted in parallel. However, this is not made public for the exact reasons that you realized - the semantics in case of stateful operations is not

Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-12 Thread Tathagata Das
Could you come up with a minimal example through which I can reproduce the problem? On Tue, Feb 10, 2015 at 12:30 PM, conor wrote: > I am getting the following error when I kill the spark driver and restart > the job: > > 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint fro

Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
Hey Tim, Let me get the key points. 1. If you are not writing back to Kafka, the delay is stable? That is, instead of "foreachRDD { // write to kafka }" if you do "dstream.count", then the delay is stable. Right? 2. If so, then Kafka is the bottleneck. Is the number of partitions, that you spoke

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-12 Thread Tathagata Das
Can you give me the whole logs? TD On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg wrote: > OK that worked and getting close here ... the job ran successfully for a > bit and I got output for the first couple buckets before getting a > "java.lang.Exception: Could not compute split, block input-0-14

Re: streaming joining multiple streams

2015-02-12 Thread Tathagata Das
Sorry for the late response. With the amount of data you are planning join, any system would take time. However, between Hive's MapRduce joins, and Spark's basic shuffle, and Spark SQL's join, the latter wins hands down. Furthermore, with the APIs of Spark and Spark Streaming, you will have to do s

Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
tage failure: Task 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found > Exception in thread "main" org.apache.spark.SparkExcepti

Re: Interact with streams in a non-blocking way

2015-02-13 Thread Tathagata Das
Here is an example of how you can do. Lets say "myDStream" contains the data that you may want to asynchornously query, say using, Spark SQL. val sqlContext = new SqlContext(streamingContext.sparkContext) myDStream.foreachRDD { rdd => // rdd is a RDD of case class sqlContext.registerRDDAsTab

Re: NaiveBayes classifier causes ShuffleDependency class cast exception

2015-02-13 Thread Tathagata Das
You cannot have two Spark Contexts in the same JVM active at the same time. Just create one SparkContext and then use it for both purpose. TD On Fri, Feb 6, 2015 at 8:49 PM, VISHNU SUBRAMANIAN < johnfedrickena...@gmail.com> wrote: > Can you try creating just a single spark context and then try

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Tathagata Das
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get ri

Re: In a Spark Streaming application, what might be the potential causes for "util.AkkaUtils: Error sending message in 1 attempts" and "java.util.concurrent.TimeoutException: Futures timed out and"

2015-02-19 Thread Tathagata Das
What version of Spark are you using? TD On Thu, Feb 19, 2015 at 2:45 AM, Emre Sevinc wrote: > Hello, > > We have a Spark Streaming application that watches an input directory, and > as files are copied there the application reads them and sends the contents > to a RESTful web service, receives

Re: Any sample code for Kafka consumer

2015-02-22 Thread Tathagata Das
Spark Streaming already directly supports Kafka http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources Is there any reason why that is not sufficient? TD On Sun, Feb 22, 2015 at 5:18 PM, mykidong wrote: > In java, you can see this example: > https://github.com/my

Re: Periodic Broadcast in Apache Spark Streaming

2015-02-23 Thread Tathagata Das
You could do something like this. def rddTrasnformationUsingBroadcast(rdd: RDD[...]): RDD[...] = { val broadcastToUse = getBroadcast()// get the reference to a broadcast variable, new or existing. rdd.map { .. } // use broadcast variable } dstream.transform(rddTrasnformationUsi

Re: How to diagnose "could not compute split" errors and failed jobs?

2015-02-23 Thread Tathagata Das
Could you find the executor logs on the executor where that task was scheduled? That may provide more information on what caused the error. Also take a look at where the block in question was stored, and where the task was scheduled. You will need to enabled log4j INFO level logs for this debugging

Re: Query data in Spark RRD

2015-02-23 Thread Tathagata Das
You will have a build a split infrastructure - a front end that takes the queries from the UI and sends them to the backend, and the backend (running the Spark Streaming app) will actually run the queries on table created in the contexts. The RPCs necessary between the frontend and backend will nee

Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tathagata Das
There are different kinds of checkpointing going on. updateStateByKey requires RDD checkpointing which can be enabled only by called sparkContext.setCheckpointDirectory. But that does not enable Spark Streaming driver checkpoints, which is necessary for recovering from driver failures. That is enab

Re: Query data in Spark RRD

2015-02-23 Thread Tathagata Das
ing on that line. > > The problem is how to send to send the query to the backend? Bundle a http > server into a spark streaming job, that will accept the parameters? > > -- > Nikhil Bafna > > On Mon, Feb 23, 2015 at 2:04 PM, Tathagata Das > wrote: > >> You will ha

Re: spark streaming window operations on a large window size

2015-02-23 Thread Tathagata Das
The default persistence level is MEMORY_AND_DISK, so the LRU policy would discard the blocks to disk, so the streaming app will not fail. However, since things will get constantly read in and out of disk as windows are processed, the performance wont be great. So it is best to have sufficient memor

Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4)

2015-02-23 Thread Tathagata Das
You could have a hdfs configuration files in the classpath of the program. HDFS libraries that Spark uses automatically picks those up and starts using them. TD On Mon, Feb 23, 2015 at 7:47 PM, bit1...@163.com wrote: > I am crazy for frequent mail rejection so I create a new thread > SMTP error

Re: Executor size and checkpoints

2015-02-23 Thread Tathagata Das
Hey Yana, I think you posted screenshots, but they are not visible in the email. Probably better to upload them and post links. Are you using StreamingContext.getOrCreate? If that is being used, then it will recreate the SparkContext with SparkConf having whatever configuration is present in the

Re: Accumulator in SparkUI for streaming

2015-02-23 Thread Tathagata Das
Unless I am unaware some latest changes, the SparkUI shows stages, and jobs, not accumulator results. And the UI not designed to be pluggable for showing user-defined stuff. TD On Fri, Feb 20, 2015 at 12:25 AM, Tim Smith wrote: > On Spark 1.2: > > I am trying to capture # records read from a ka

Re: About FlumeUtils.createStream

2015-02-23 Thread Tathagata Das
Akhil, that is incorrect. Spark will list on the given port for Flume to push data into it. When in local mode, it will listen on localhost: When in some kind of cluster, instead of localhost you will have to give the hostname of the cluster node where you want Flume to forward the data. Spark

Re: Re: About FlumeUtils.createStream

2015-02-23 Thread Tathagata Das
* Akhil Das > *Date:* 2015-02-24 12:58 > *To:* Tathagata Das > *CC:* user ; bit1129 > *Subject:* Re: About FlumeUtils.createStream > > I see, thanks for the clarification TD. > On 24 Feb 2015 09:56, "Tathagata Das" wrote: > >> Akhil, that is incorrect. >> >

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-23 Thread Tathagata Das
In case this mystery has not been solved, DStream.print() essentially does a RDD.take(10) on each RDD, which computes only a subset of the partitions in the RDD. But collects forces the evaluation of all the RDDs. Since you are writing to json in the mapI() function, this could be the reason. TD

Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
Exactly, that is the reason. To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API (called direct stream) which does not use Zookeeper at all to keep track of progress, and maintains offset within Spark Streaming. That can guarantee all records being received exactly-once. Its

Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
: > Kafka 0.8.2 has built-in offset management, how would that affect direct > stream in spark? > Please see KAFKA-1012 > > --- Original Message --- > > From: "Tathagata Das" > Sent: February 23, 2015 9:53 PM > To: "V Dineshkumar" > Cc: &quo

Re: spark streaming window operations on a large window size

2015-02-23 Thread Tathagata Das
Yes. On Mon, Feb 23, 2015 at 11:16 PM, Avi Levi wrote: > @Tathagata Das so basically you are saying it is supported out of the > box, but we should expect a significant performance hit - is that right? > > > > On Tue, Feb 24, 2015 at 5:37 AM, Tathagata Das > wrote: > &g

Re: throughput in the web console?

2015-02-25 Thread Tathagata Das
Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J w

Re: Spark Streaming - Collecting RDDs into array in the driver program

2015-02-25 Thread Tathagata Das
You are just setting up the computation here using foreacRDD. You have not even run the streaming context to get any data. On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore < thanigai.vell...@gmail.com> wrote: > I have this function in the driver program which collects the result from > rdds (in

Re: Re: Many Receiver vs. Many threads per Receiver

2015-02-25 Thread Tathagata Das
Spark Streaming has a new Kafka direct stream, to be release as experimental feature with 1.3. That uses a low level consumer. Not sure if it satisfies your purpose. If you want more control, its best to create your own Receiver with the low level Kafka API. TD On Tue, Feb 24, 2015 at 12:09 AM, b

Re: throughput in the web console?

2015-02-26 Thread Tathagata Das
;> (records/sec) when I have a stream to process log files that appear in a >> folder? >> >> >> >> On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das >> wrote: >> >>> Yes. # tuples processed in a batch = sum of all the tuples received by >>>

Re: Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Tathagata Das
Hey Mike, I quickly looked through the example and I found major performance issue. You are collecting the RDDs to the driver and then sending them to Mongo in a foreach. Why not doing a distributed push to Mongo? WHAT YOU HAVE val mongoConnection = ... WHAT YOU SHUOLD DO rdd.foreachPartition {

Re: spark streaming: stderr does not roll

2015-02-26 Thread Tathagata Das
If the mentioned conf is enabled, the rolling of the stderr should work. If it is not, then there is probably some bug. Take a look at the Worker's logs and see if there is any error about rolling of the Executor's stderr. If there is a bug, then it needs to be fixed (maybe you can take a crack at

<    1   2   3   4   5   6   7   8   9   10   >