Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-23 Thread Jungtaek Lim
The issue looks like fixed in https://issues.apache.org/jira/browse/SPARK-23670, and likely 2.3.1 will include the fix. -Jungtaek Lim (HeartSaVioR) 2018년 5월 23일 (수) 오후 7:12, weand <andreas.we...@gmail.com>님이 작성: > Thanks for clarification. So it really seem a Spark UI OOM Issue.

Re: OOM: Structured Streaming aggregation state not cleaned up properly

2018-05-22 Thread Jungtaek Lim
1. Could you share your Spark version? 2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it helps? This configuration is available in 2.3.0, and default value is 1000. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 5월 22일 (화) 오후 4:29, weand <andreas.we...@gmail.com&

Re: [Structured Streaming] Metrics or logs of events that are ignored due to watermark

2018-07-02 Thread Jungtaek Lim
and provide late input rows based on this. So I think this is valuable to address, and I'm planning to try to address it, but it would be OK for someone to address it earlier. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 7월 3일 (화) 오전 3:39, subramgr 님이 작성: > Hi all, > > Do we have some log

Re: Error while doing stream-stream inner join (java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access)

2018-07-03 Thread Jungtaek Lim
Could you please describe the version of Spark, and how did you run your app? If you don’t mind to share minimal app which can reproduce this, it would be really great. - Jungtaek Lim (HeartSaVioR) On Mon, 2 Jul 2018 at 7:56 PM kant kodali wrote: > Hi All, > > I get the below error qu

Re: RepartitionByKey Behavior

2018-06-21 Thread Jungtaek Lim
It is not possible because the cardinality of the partitioning key is non-deterministic, while partition count should be fixed. There's a chance that cardinality > partition count and then the system can't ensure the requirement. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 6월 22일 (금) 오전 8

Re: [Spark Structured Streaming] Measure metrics from CsvSink for Rate source

2018-06-21 Thread Jungtaek Lim
1529640103,1.0606060606060606 1529640113,0.9997000899730081 Could you add streaming query listener and see the value of sources -> numInputRows, inputRowsPerSecond, processedRowsPerSecond? They should provide some valid numbers. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 6월 22일 (금) 오전 11:49, Dh

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Jungtaek Lim
't apply coalesce to individual operator in narrow dependency. -Jungtaek Lim (HeartSaVioR) 2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성: > well an interesting side effect of this is that i can now control the > number of partitions for every shuffle in a dataframe job, as opposed to > having

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Jungtaek Lim
ot;. Not sure there's any available trick to achieve it without calling repartition. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937 2018년 8월 9일 (목) 오전 5:55, Koe

Re: Structured Streaming doesn't write checkpoint log when I use coalesce

2018-08-09 Thread Jungtaek Lim
$"mod", $"word") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .coalesce(8) val query = outDf.writeStream .format("memory") .optio

Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
[T] = { DatasetHolder(_sqlContext.createDataset(rdd)) } You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder. -Jungtaek Lim (HeartSaVioR) 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh 님이 작성: > Tha

Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
-of-org-apache/m-p/29994/highlight/true#M973 And which Spark version do you use? 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim 님이 작성: > Sorry I guess I pasted another method. the code is... > > implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): > DatasetHolder[T] = { >

Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
You may need to import implicits from your spark session like below: (Below code is borrowed from https://spark.apache.org/docs/latest/sql-programming-guide.html) import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example")

Re: getting error: value toDF is not a member of Seq[columns]

2018-09-05 Thread Jungtaek Lim
Sorry I guess I pasted another method. the code is... implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) } 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim 님이 작성: > I guess you need to have encoder for the type of res

Re: getting error: value toDF is not a member of Seq[columns]

2018-09-06 Thread Jungtaek Lim
KER: string ... 2 more fields] Maybe need to know about actual type of key, ticker, timeissued, price from your variables. Jungtaek Lim (HeartSaVioR) 2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh 님이 작성: > I am trying to understand why spark cannot convert a simple comma > separated columns as D

Re: [Structured Streaming] Custom StateStoreProvider

2018-07-12 Thread Jungtaek Lim
Girish, I think reading through implementation of HDFSBackedStateStoreProvider as well as relevant traits should bring the idea to you how to implement custom one. HDFSBackedStateStoreProvider is not that complicated to read and understand. You just need to deal with your underlying storage

Re: is it ok to make I/O calls in UDF? other words is it a standard practice ?

2018-04-24 Thread Jungtaek Lim
Another thing you may want to be aware is, if the result is not idempotent, your query result is also not idempotent. For fault-tolerance there's a chance for record (row) to be replayed (recomputed). -Jungtaek Lim (HeartSaVioR) 2018년 4월 24일 (화) 오후 2:07, Jörn Franke <jornfra...@gmail.com>

Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
ll five records together to the socket (by nc), two micro-batches were handling the records and provide two results. --- Batch: 0 --- +---+--++ | ID|AMOUNT|MY_TIMESTAMP| +---+--+----+ |

Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
regation requires Update/Complete mode but join requires Append mode. (Guide page of structured streaming clearly explains such limitation: "Cannot use streaming aggregation before joins.") If you can achieve with mapGroupWithState, you may want to stick with that. Btw, when you deal

Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
t;) .groupBy($"ID") .agg(maxrow(col("AMOUNT"), col("MY_TIMESTAMP")).as("maxrow")) .selectExpr("ID", "maxrow.st.AMOUNT", "maxrow.st.MY_TIMESTAMP") .writeStream .format("console") .trigger(Trigger.ProcessingTime(&q

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
: String)org.apache.spark.sql.Column (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
uot;) .agg(max(struct($"AMOUNT", $"*")).as("data")) .select($"data.*") .writeStream .format("console") .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .start() It still have a minor

Looking for some feedbacks on proposal - native support of session window

2018-09-27 Thread Jungtaek Lim
Hi users, I'm Jungtaek Lim, one of contributors on streaming part. Recently I proposed some new feature: native support of session window [1]. While it also tackles the edge-case map/flatMapGroupsWithState don't cover for session window, its major benefit is mostly better usability on session

Re: Processing Flexibility Between RDD and Dataframe API

2018-10-29 Thread Jungtaek Lim
architecture. -Jungtaek Lim 2018년 10월 29일 (월) 오전 12:06, Adrienne Kole 님이 작성: > Thanks for bringing this issue to the mailing list. > As an addition, I would also ask the same questions about DStreams and > Structured Streaming APIs. > Structured Streaming is high level and it makes difficul

Re: Equivalent of emptyDataFrame in StructuredStreaming

2018-11-05 Thread Jungtaek Lim
Could you explain what you're trying to do? It should have no batch for no data in stream, so it will end up to no-op even it is possible. - Jungtaek Lim (HeartSaVioR) 2018년 11월 6일 (화) 오전 8:29, Arun Manivannan 님이 작성: > Hi, > > I would like to create a "zero" value for a S

Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread Jungtaek Lim
If we move the phase of filtering out late records, what you would like to do may become the default behavior. This also means the output may be also changed for queries which use non-stateful operations, so it is not a trivial change and may require consensus like SPIP process. Thanks, Jungtaek Lim

Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Jungtaek Lim
+ INTERVAL 1 HOUR), 2. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow). So yes, join condition should directly deal with timestamp column, otherwise state will grow infinitely. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 12월 11일 (화) 오후 2:52

Re: [Spark Structued Streaming]: Read kafka offset from a timestamp

2018-11-19 Thread Jungtaek Lim
at it. Btw, it might also help on batch query as well, actually sounds more helpful on batch query. -Jungtaek Lim (HeartSaVioR) 2018년 11월 18일 (일) 오전 9:53, puneetloya 님이 작성: > I would like to request a feature for reading data from Kafka Source based > on > a timestamp. So that if the applicat

Re: Spark Streaming

2018-11-26 Thread Jungtaek Lim
uot; and extra calls of "get_json_object". - Jungtaek Lim (HeartSaVioR) 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성: > Hello All, > > I am using Spark 2.3 version and i am trying to write Spark Streaming > Join. It is a basic join and it is taking more time to join the stream

Re: Why does Apache Spark Master shutdown when Zookeeper expires the session

2019-03-06 Thread Jungtaek Lim
information when one of them becomes master. That should require pretty much changes though. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-15544 2. https://issues.apache.org/jira/browse/SPARK-23530 2019년 3월 5일 (화) 오후 10:02, lokeshkumar 님이 작성: > A

Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Jungtaek Lim
The query makes state growing infinitely. Could you consider watermark apply to "receivedAt" to let unnecessary part of state cleared out? Other than watermark you could implement TTL based eviction via flatMapGroupsWithState, though you'll need to implement your custom "dropDuplicate". 2019년 3월

Re: Structured Streaming & Query Planning

2019-03-18 Thread Jungtaek Lim
it's feasible. Thanks, Jungtaek Lim (HeartSaVioR) 2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성: > I can understand that if you involve columns with variable distribution in > join operations, it may change your execution plan, but most of the time > this is not going to happen, in

Re: Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-060f3ceb-09f4-4e28-8210-3ef8a845fc92--2038748645-driver-2

2019-02-13 Thread Jungtaek Lim
/jira/browse/SPARK-26350 2. https://issues.apache.org/jira/browse/SPARK-26121 Thanks, Jungtaek Lim (HeartSaVioR) 2019년 2월 13일 (수) 오후 6:36, Gabor Somogyi 님이 작성: > Hi Thomas, > > The issue occurs when the user does not have the READ permission on the > consumer groups. > > In

Re: java.lang.IllegalArgumentException: Unsupported class file major version 55

2019-02-07 Thread Jungtaek Lim
. Thanks, Jungtaek Lim (HeartSaVioR) 2019년 2월 7일 (목) 오후 9:18, Gabor Somogyi 님이 작성: > Hi Hande, > > "Unsupported class file major version 55" means java incompatibility. > This error means you're trying to load a Java "class" file that was > compiled with a newer vers

Offline state manipulation tool for structured streaming query

2019-04-13 Thread Jungtaek Lim
to hear new ideas of improvements, and much appreciated for contributions! Enjoy! Thanks, Jungtaek Lim (HeartSaVioR)

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
re dealing with streaming-streaming join. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-26154 On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann wrote: > Hi all > > sorry, tl;dr > > I'm on my first Python Spark structured streaming app, in the end jo

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
; have a match > - inner join with 1 left outer followed by aggregation only produces the > messages with a match > > Of course, all are stream-stream joins > > CU, Joe > > On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim > wrote: > > I would suspect that rows are

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-11 Thread Jungtaek Lim
of previous stateful operator will become inputs of next stateful operator, they should have different watermark) and one of contributor proposes the approach [1] which would fit for Spark (unfortunately it haven't been reviewed by committers so long). Thanks, Jungtaek Lim (HeartSaVioR) 1. https

Re: [StructuredStreaming] HDFSBackedStateStoreProvider is leaking .crc files.

2019-06-12 Thread Jungtaek Lim
Nice finding! Given you already pointed out previous issue which fixed similar issue, it would be also easy for you to craft the patch and verify whether the fix resolves your issue. Looking forward to see your patch. Thanks, Jungtaek Lim (HeartSaVioR) On Wed, Jun 12, 2019 at 8:23 PM Gerard

Re: Change parallelism number in Spark Streaming

2019-06-27 Thread Jungtaek Lim
Great, thanks! Even better if you could share the slide as well (and if possible video too), since it would be helpful for other users to understand about the details. Thanks again, Jungtaek Lim (HeartSaVioR) On Thu, Jun 27, 2019 at 7:33 PM Jacek Laskowski wrote: > Hi, > > I've g

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
es reading and writing state in structured streaming, achieving rescaling and schema evolution. https://github.com/HeartSaVioR/spark-state-tools (DISCLAIMER: I'm an author of this tool.) Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei wrote: > Thank you for you

Re: Change parallelism number in Spark Streaming

2019-06-26 Thread Jungtaek Lim
Glad to help, Jacek. I'm happy you're doing similar thing, which means it could be pretty useful for others as well. Looks like it might be good enough to contribute state source and sink. I'll sort out my code and submit a PR. Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Jun 27, 2019 at 7:54 AM

Re: Spark Standalone - Failing to pass extra java options to the driver in cluster mode

2019-08-19 Thread Jungtaek Lim
Hi Alex, you seem to hit SPARK-26606 [1] which has been fixed in 2.4.1. Could you try it out with latest version? Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-26606 On Tue, Aug 20, 2019 at 3:43 AM Alex Landa wrote: > Hi, > > We are using Spark S

Kafka offset committer tool for structured streaming query

2019-09-23 Thread Jungtaek Lim
via "--jars" option until artifact is published. I'd be happy to hear new ideas of improvements, and much appreciated for contributions! Enjoy! Thanks, Jungtaek Lim (HeartSaVioR)

Re: pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-20 Thread Jungtaek Lim
even they spent their time. Memory leak issue is not really easy to reproduce, unless it leaks some objects without any conditions. - Jungtaek Lim (HeartSaVioR) On Sun, Oct 20, 2019 at 7:18 PM Paul Wais wrote: > Dear List, > > I've observed some sort of memory leak when using pyspa

Re: apache-spark Structured Stateful Streaming with window / SPARK-21641

2019-10-16 Thread Jungtaek Lim
.toDF() > .selectExpr("_1", "CAST(_2 / 1000 AS TIMESTAMP) AS timestamp") > .select(col("*"), window(col("timestamp"), "10 seconds", "5 > seconds").as("window")) > .select(col("_1"), col("window.star

Re: chaining flatMapGroupsWithState in append mode

2019-11-28 Thread Jungtaek Lim
PR: https://github.com/apache/spark/pull/24890 Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Nov 28, 2019 at 9:55 PM alex770 wrote: > I wrote few examples chaining flatMapGroupsWithState in append mode and it > worked. Other examples did not. > > The question is if the flatMapGroupsWithState

Re: Operators supported by Spark Structured Streaming

2019-11-28 Thread Jungtaek Lim
Hi, Unsupported operations in Structured Streaming is explained in the guide doc. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations Thanks, Jungtaek Lim (HeartSaVioR) On Fri, Nov 29, 2019 at 2:08 PM shicheng31...@gmail.com < shichen

Re: [ANNOUNCE] Announcing Apache Spark 2.4.5

2020-02-10 Thread Jungtaek Lim
Nice work, Dongjoon! Thanks for the huge efforts on sorting out with correctness things as well. On Tue, Feb 11, 2020 at 12:40 PM Wenchen Fan wrote: > Great Job, Dongjoon! > > On Mon, Feb 10, 2020 at 4:18 PM Hyukjin Kwon wrote: > >> Thanks Dongjoon! >> >> 2020년 2월 9일 (일) 오전 10:49, Takeshi

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Jungtaek Lim
I've investigated a bit, and looks like it's not an issue of mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It seems to miss handling UDT and the missing spot makes the internal code of Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.) I've filed

Re: Structured Streaming: mapGroupsWithState UDT serialization does not work

2020-02-29 Thread Jungtaek Lim
this issue pops up now whereas the relevant code lives very long time. On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim wrote: > I've investigated a bit, and looks like it's not an issue of > mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It > seems to miss han

Re: Extract value from streaming Dataframe to a variable

2020-02-02 Thread Jungtaek Lim
nks for your reply. > > I'm using Spark 2.3.2. Looks like foreach operation is only supported for > Java and Scala. Is there any alternative for Python? > > On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim > wrote: > >> Hi, >> >> you can try out foreachBatch to appl

Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-02-02 Thread Jungtaek Lim
the issue you've describe actually applies to "event time" processing (delayed output vs discarded late events). Hope this helps. Jungtaek Lim (HeartSaVioR) On Fri, Jan 24, 2020 at 7:19 AM stevech.hu wrote: > Anyone know the answers or pointers? thanks. > > > >

Re: Extract value from streaming Dataframe to a variable

2020-01-20 Thread Jungtaek Lim
Hi, you can try out foreachBatch to apply the batch query operation to the each output of micro-batch: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Mon, Jan 20, 2020 at 8

Re: [ANNOUNCE] Announcing Apache Spark 3.0.0-preview2

2019-12-24 Thread Jungtaek Lim
Great work, Yuming! Happy Holidays. On Wed, Dec 25, 2019 at 9:08 AM Dongjoon Hyun wrote: > Indeed! Thank you again, Yuming and all. > > Bests, > Dongjoon. > > > On Tue, Dec 24, 2019 at 13:38 Takeshi Yamamuro > wrote: > >> Great work, Yuming! >> >> Bests, >> Takeshi >> >> On Wed, Dec 25, 2019

Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Jungtaek Lim
I think Spark is trying to ensure that it reads the input "continuously" without any missing. Technically it may be valid to say the situation is a kind of "data-loss", as the query couldn't process the offsets which are being thrown out, and owner of the query needs to be careful as it affects

Re: Understanding spark structured streaming checkpointing system

2020-04-19 Thread Jungtaek Lim
That sounds odd. Is it intermittent, or always reproducible if you starts with same checkpoint? What's the version of Spark? On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li wrote: > Hi all, > > I have a question on how structured streaming does checkpointing. I’m > noticing that spark is not reading

Re: Spark structured streaming - Fallback to earliest offset

2020-04-19 Thread Jungtaek Lim
hang? > > On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim < > kabhwan.opensou...@gmail.com> wrote: > >> I think Spark is trying to ensure that it reads the input "continuously" >> without any missing. Technically it may be valid to say the situation is a

Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-19 Thread Jungtaek Lim
Did you provide more records to topic "after" you started the query? That's the only one I can imagine based on such information. On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li wrote: > Hi all, > > Apologies if this has been asked before, but I could not find the answer > to this question. We have

Re: [Structured Streaming] Checkpoint file compact file grows big

2020-04-19 Thread Jungtaek Lim
each the bad state). That said, it doesn't completely get rid of necessity of TTL, but open the chance to have longer TTL without encountering bad state. If you're adventurous you can apply these patches on your version of Spark and see whether it helps. Hope this helps. Thanks, Jungtaek Lim (HeartSa

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-16 Thread Jungtaek Lim
Do thread dump continuously, per specific period (like 1s) and see the change of stack / lock for each thread. (This is not easy to be done in UI so maybe doing manually would be the only option. Not sure Spark UI will provide the same, haven't used at all.) It will tell which thread is being

Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Could you play with Encoders.bean()? You can Encoders.bean() with your class, and call .schema() with the return value to see how it transforms to the schema in Spark SQL. The schema must be consistent across multiple JVM runs to make it work properly, but I suspect it doesn't retain the order.

Re: spark structured streaming GroupState returns weird values from sate

2020-03-27 Thread Jungtaek Lim
Well, the code itself doesn't seem to be OK - you're using ProductStateInformation as the class of State whereas you provide ProductSessionInformation to Encoder for State. On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim wrote: > Could you play with Encoders.bean()? You can Encoders.b

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Jungtaek Lim
>> 1. some of the values being null or >> 2.UTF8 issue ? Or some serilization/ deserilization issue ? >> 3. Not enough memory ? >> BTW, I am using same names in my code. >> >> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.co

Re: Spark Streaming Code

2020-03-28 Thread Jungtaek Lim
To get any meaningful answers you may want to provide the information/context as much as possible. e.g. Spark version, which behavior/output was expected (and why you think) and how it behaves actually. On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj wrote: > Hi Team, > > Need help on windowing &

Re: spark structured streaming GroupState returns weird values from sate

2020-03-31 Thread Jungtaek Lim
roblem >> is that I am adding this code in an existing big project with several >> dependencies with spark streaming older version(2.2) on root level etc. >> >> Also, I observed that there is @Experimental on GroupState class. What >> state is it in now? Several people using this f

Re: Data Source - State (SPARK-28190)

2020-03-30 Thread Jungtaek Lim
to lack of support as well. Thanks, Jungtaek Lim (HeartSaVioR) On Tue, Mar 31, 2020 at 4:50 AM Bryan Jeffrey wrote: > Hi, Jungtaek. > > We've been investigating the use of Spark Structured Streaming to replace > our Spark Streaming operations. We have several cases where

Re: Spark Structure Streaming | FileStreamSourceLog not deleting list of input files | Spark -2.4.0

2020-04-21 Thread Jungtaek Lim
/rb4ebf1d20d13db0a78694e8d301e51c326f803cb86fc1a1f66f2ae7e%40%3Cuser.spark.apache.org%3E Thanks, Jungtaek Lim (HeartSaVioR) On Tue, Apr 21, 2020 at 8:23 PM Pappu Yadav wrote: > Hi Team, > > While Running Spark Below are some finding. > >1. FileStreamSourceLog is responsible for maintaining input s

Re: is RosckDB backend available in 3.0 preview?

2020-04-22 Thread Jungtaek Lim
to the implementation on Spark ecosystem. On Thu, Apr 23, 2020 at 1:22 AM kant kodali wrote: > is it going to make it in 3.0? > > On Tue, Apr 21, 2020 at 9:24 PM Jungtaek Lim > wrote: > >> Unfortunately, the short answer is no. Please refer the last part of >> discussion on the P

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
cks after being clicked, then >>>> you can check the root cause of holding locks like this(Thread 48 of >>>> above): >>>> >>>> org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native >>>> Method) >>>> >>>> org.fusesource.jansi.internal.Kernel32.r

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
limits debugging help, but wanted > to understand if anyone has encountered a similar issue. > > On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim > wrote: > >> If there's no third party libraries in the dump then why not share the >> thread dump? (I mean, the output of jstack) >&g

Re: is RosckDB backend available in 3.0 preview?

2020-04-21 Thread Jungtaek Lim
Unfortunately, the short answer is no. Please refer the last part of discussion on the PR https://github.com/apache/spark/pull/24922 Unless we get any native implementation of this, I guess this project is most widely known implementation for RocksDB backend state store -

Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
which is the characteristic of streaming query, hence hybrid one. > > > On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim > wrote: > >> If I understand correctly, Trigger.once executes only one micro-batch and >> terminates, that's all. Your understanding of structured stream

Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and terminates, that's all. Your understanding of structured streaming applies there as well. It's like a hybrid approach as bringing incremental processing from micro-batch but having processing interval as batch. That said,

Re: [Structured Streaming] NullPointerException in long running query

2020-04-28 Thread Jungtaek Lim
The root cause of exception is occurred in executor side "Lost task 10.3 in stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there. On Tue, Apr 28, 2020 at 2:52 PM lec ssmi wrote: > Hi: > One of my long-running queries occasionally encountered the following > exception: > > >

Re: Pyspark Kafka Structured Stream not working.

2020-05-06 Thread Jungtaek Lim
be adopted with such convention. (e.g. no space) Hope this helps, Thanks, Jungtaek Lim (HeartSaVioR) On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar wrote: > Hi All, > > > > I am getting the below error while using Pyspark Structured Streaming from > Kafka Producer. > > > >

Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
these metrics (e.g. numInputRows, > inputRowsPerSecond). > > I am talking about "No. of States" in the memory at any given time. > > On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim > wrote: > >> If you're referring total "entries" in all states in SS job

Re: [E] Re: Pyspark Kafka Structured Stream not working.

2020-05-07 Thread Jungtaek Lim
at 3:55 PM Vijayant Kumar wrote: > Hi Jungtek, > > > > Thanks for the response. It appears to be #1. > > I will appreciate if you can share some sample command to submit the Spark > application.? > > > > *From:* Jungtaek Lim [mailto:kabhwan.opensou...@gmail.

Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
If you're referring total "entries" in all states in SS job, it's being provided via StreamingQueryListener. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries Hope this helps. On Fri, May 8, 2020 at 3:26 AM Something Something wrote:

Re: [structured streaming] [stateful] Null value appeared in non-nullable field

2020-05-23 Thread Jungtaek Lim
Hi, Only with stack trace there’s nothing to look into it. It’d be better to provide simple reproducer (code, and problematic inputs) so that someone may give it a try. You may also want to give it a try with 3.0.0, RC2 is better to test against, but preview2 would be easier for end users to

Re: Elastic Search sink showing -1 for numOutputRows

2020-09-07 Thread Jungtaek Lim
I don't know about ES sink. The availability of "numOutputRows" depends on the API version the sink is implementing (DSv1 vs DSv2), so you may be better to ask a question to the author of ES sink and confirm the case. On Tue, Sep 8, 2020 at 5:15 AM jainshasha wrote: > Hi, > > Using structured

Re: Keeping track of how long something has been in a queue

2020-09-07 Thread Jungtaek Lim
te. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Fri, Sep 4, 2020 at 11:21 PM Hamish Whittal wrote: > Sorry, I moved a paragraph, > > (2) If Ms green.th was first seen at 13:04:04, then at 13:04:05 and >> finally at 13:04:17, she's been in the queue for 13 seconds (ignoring the >> ms). >> >

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
edge of a previously found utility location, and > repeats the search from the very start causing useless file system search > operations over and over again. > > This may or may not matter when HDFS is used for checkpoint store > (depending on how HDFS server implements the calls), but it does mat

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
and in Python: > > from gresearch.spark.dgraph.connector import *triples = > spark.read.dgraph.triples("localhost:9080") > > I agree that 3rd parties should also support the official > spark.read.format() and the new catalog approaches. > > Enrico > > Am 05.10.20 um 14:03 schri

Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Jungtaek Lim
I can't spend too much time on explaining one by one. I strongly encourage you to do a deep-dive instead of just looking around as you want to know about "details" - that's how open source works. I'll go through a general explanation instead of replying inline; probably I'd write a blog doc if

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
andra connector leverages it. I see some external data sources starting to support catalog, and in Spark itself there's some effort to support catalog for JDBC. https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
checkpoint works in SS, temp file is atomically renamed to be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to delete the crc file which two additional operations (exist -> delete) may occur per crc file. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) 1. ht

Re: Arbitrary stateful aggregation: updating state without setting timeout

2020-10-05 Thread Jungtaek Lim
ove state for the group (key). Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) ‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ < yur...@gmail.com> wrote:‬ > Hi all, I have following question: > > What happens to the state (in terms of expiration) if I’m updating the

Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Jungtaek Lim
Hi Amit, if I remember correctly, you don't need to restart the query to reflect the newly added topic and partition, if your subscription covers the topic (like subscribe pattern). Please try it out. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Fri, Aug 28, 2020 at 1:56 PM Amit

Re: Structured Streaming metric for count of delayed/late data

2020-08-22 Thread Jungtaek Lim
s there any workaround for this limitation of inaccurate count, maybe by > adding some additional streaming operation in SS job without impacting perf > too much ? > > > > Regards, > > Rajat > > > > *From: *Jungtaek Lim > *Date: *Friday, 21 August 2020 at 12:07 PM >

Re: Structured Streaming metric for count of delayed/late data

2020-08-21 Thread Jungtaek Lim
One more thing to say, unfortunately, the number is not accurate compared to the input rows on streaming aggregation, because Spark does local-aggregate and counts dropped inputs based on "pre-locally-aggregated" rows. You may want to treat the number as whether dropping inputs is happening or

Re: Query around Spark Checkpoints

2020-09-27 Thread Jungtaek Lim
it, and evaluate whether your target storage can fulfill the requirement. Thanks, Jungtaek Lim (HeartSaVioR) On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi wrote: > Hi, > > As far as I know, it depends on whether you are using spark streaming or > structured streaming. > In spark streamin

Re: Query around Spark Checkpoints

2020-09-29 Thread Jungtaek Lim
a workaround then probably something is going wrong. Please feel free to share it. Thanks, Jungtaek Lim (HeartSaVioR) 2020년 9월 30일 (수) 오전 1:14, Bryan Jeffrey 님이 작성: > Jungtaek, > > How would you contrast stateful streaming with checkpoint vs. the idea of > writing updates to a Delt

Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Jungtaek Lim
igurations Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Tue, May 26, 2020 at 6:42 AM Something Something < mailinglist...@gmail.com> wrote: > I keep getting this error message: > > > *The message is 1169350 bytes when serialized which is larger than the > maximum r

Re: States get dropped in Structured Streaming

2020-10-23 Thread Jungtaek Lim
Unfortunately your information wouldn't provide any hint that rows in the state are evicted correctly on watermark advance or there's an unknown bug which some of the rows in state are silently dropped. I haven't heard of the case for the latter - probably you'd like to double check it with

Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Jungtaek Lim
Please provide logs and dump file for the OOM case - otherwise no one could say what's the cause. Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="...dir..." On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava wrote: > *Issue:* I am trying to process 5000+

Re: Lazy Spark Structured Streaming

2020-08-02 Thread Jungtaek Lim
; refer if not fixing > the "need to add a dummy record to move watermark forward"? > > Kind regards, > > Phillip > > > > > On Mon, Jul 27, 2020 at 11:41 PM Jungtaek Lim < > kabhwan.opensou...@gmail.com> wrote: > >> I'm not sure what exac

Re: [SPARK-STRUCTURED-STREAMING] IllegalStateException: Race while writing batch 4

2020-08-12 Thread Jungtaek Lim
the table in good shape) Thanks, Jungtaek Lim (HeartSaVioR) On Sat, Aug 8, 2020 at 4:19 AM Amit Joshi wrote: > Hi, > > I have 2spark structure streaming queries writing to the same outpath in > object storage. > Once in a while I am getting the "IllegalStateException: Race w

Re: Pyspark: Issue using sql in foreachBatch sink

2020-07-31 Thread Jungtaek Lim
Python doesn't allow abbreviating () with no param, whereas Scala does. Use `write()`, not `write`. On Wed, Jul 29, 2020 at 9:09 AM muru wrote: > In a pyspark SS job, trying to use sql instead of sql functions in > foreachBatch sink > throws AttributeError: 'JavaMember' object has no attribute

Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Jungtaek Lim
in point of Kafka's view, especially the gap between highest offset and committed offset. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi wrote: > In 3.0 the community just added it. > > On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed,

Re: Failure Threshold in Spark Structured Streaming?

2020-07-02 Thread Jungtaek Lim
Structured Streaming is basically following SQL semantic, which doesn't have such a semantic of "max allowance of failures". If you'd like to tolerate malformed data, please read with raw format (string or binary) which won't fail with such data, and try converting. e.g. from_json() will produce

  1   2   >