ould make it delayed by 8 weeks more (10 weeks to
reach the bad state).
That said, it doesn't completely get rid of necessity of TTL, but open the
chance to have longer TTL without encountering bad state.
If you're adventurous you can apply these patches on your version of Spark
and see w
Did you provide more records to topic "after" you started the query? That's
the only one I can imagine based on such information.
On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li wrote:
> Hi all,
>
> Apologies if this has been asked before, but I could not find the answer
> to this question. We have a
t; hang?
>
> On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I think Spark is trying to ensure that it reads the input "continuously"
>> without any missing. Technically it may be valid to say the situation is a
That sounds odd. Is it intermittent, or always reproducible if you starts
with same checkpoint? What's the version of Spark?
On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li wrote:
> Hi all,
>
> I have a question on how structured streaming does checkpointing. I’m
> noticing that spark is not reading
Do thread dump continuously, per specific period (like 1s) and see the
change of stack / lock for each thread. (This is not easy to be done in UI
so maybe doing manually would be the only option. Not sure Spark UI will
provide the same, haven't used at all.)
It will tell which thread is being bloc
I think Spark is trying to ensure that it reads the input "continuously"
without any missing. Technically it may be valid to say the situation is a
kind of "data-loss", as the query couldn't process the offsets which are
being thrown out, and owner of the query needs to be careful as it affects
the
roduce, if I can. Problem
>> is that I am adding this code in an existing big project with several
>> dependencies with spark streaming older version(2.2) on root level etc.
>>
>> Also, I observed that there is @Experimental on GroupState class. What
>> state is it in now? Se
ight now due to lack of support as well.
Thanks,
Jungtaek Lim (HeartSaVioR)
On Tue, Mar 31, 2020 at 4:50 AM Bryan Jeffrey
wrote:
> Hi, Jungtaek.
>
> We've been investigating the use of Spark Structured Streaming to replace
> our Spark Streaming operations. We have
To get any meaningful answers you may want to provide the
information/context as much as possible. e.g. Spark version, which
behavior/output was expected (and why you think) and how it behaves
actually.
On Sun, Mar 29, 2020 at 3:37 AM Siva Samraj wrote:
> Hi Team,
>
> Need help on windowing & wa
en due to
>> 1. some of the values being null or
>> 2.UTF8 issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensou..
Well, the code itself doesn't seem to be OK - you're using
ProductStateInformation as the class of State whereas you provide
ProductSessionInformation to Encoder for State.
On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim
wrote:
> Could you play with Encoders.bean()? You can Encoder
Could you play with Encoders.bean()? You can Encoders.bean() with your
class, and call .schema() with the return value to see how it transforms to
the schema in Spark SQL. The schema must be consistent across multiple JVM
runs to make it work properly, but I suspect it doesn't retain the order.
On
that's why
this issue pops up now whereas the relevant code lives very long time.
On Sat, Feb 29, 2020 at 11:44 PM Jungtaek Lim
wrote:
> I've investigated a bit, and looks like it's not an issue of
> mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
>
I've investigated a bit, and looks like it's not an issue of
mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
seems to miss handling UDT and the missing spot makes the internal code of
Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)
I've filed an
Nice work, Dongjoon! Thanks for the huge efforts on sorting out with
correctness things as well.
On Tue, Feb 11, 2020 at 12:40 PM Wenchen Fan wrote:
> Great Job, Dongjoon!
>
> On Mon, Feb 10, 2020 at 4:18 PM Hyukjin Kwon wrote:
>
>> Thanks Dongjoon!
>>
>> 2020년 2월 9일 (일) 오전 10:49, Takeshi Yamam
ay, out-of-order events, etc. whereas the issue you've describe
actually applies to "event time" processing (delayed output vs discarded
late events).
Hope this helps.
Jungtaek Lim (HeartSaVioR)
On Fri, Jan 24, 2020 at 7:19 AM stevech.hu wrote:
> Anyone know the ans
; Thanks for your reply.
>
> I'm using Spark 2.3.2. Looks like foreach operation is only supported for
> Java and Scala. Is there any alternative for Python?
>
> On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim
> wrote:
>
>> Hi,
>>
>> you can try out foreachBat
Hi,
you can try out foreachBatch to apply the batch query operation to the each
output of micro-batch:
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
Hope this helps.
Thanks,
Jungtaek Lim (HeartSaVioR)
On Mon, Jan 20, 2020 at 8
Great work, Yuming! Happy Holidays.
On Wed, Dec 25, 2019 at 9:08 AM Dongjoon Hyun
wrote:
> Indeed! Thank you again, Yuming and all.
>
> Bests,
> Dongjoon.
>
>
> On Tue, Dec 24, 2019 at 13:38 Takeshi Yamamuro
> wrote:
>
>> Great work, Yuming!
>>
>> Bests,
>> Takeshi
>>
>> On Wed, Dec 25, 2019 at
Hi,
Unsupported operations in Structured Streaming is explained in the guide
doc.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
Thanks,
Jungtaek Lim (HeartSaVioR)
On Fri, Nov 29, 2019 at 2:08 PM shicheng31...@gmail.com <
shichen
of doc previously in this PR:
https://github.com/apache/spark/pull/24890
Thanks,
Jungtaek Lim (HeartSaVioR)
On Thu, Nov 28, 2019 at 9:55 PM alex770 wrote:
> I wrote few examples chaining flatMapGroupsWithState in append mode and it
> worked. Other examples did not.
>
> The question is if the f
reproduce even they spent their time. Memory leak issue is not
really easy to reproduce, unless it leaks some objects without any
conditions.
- Jungtaek Lim (HeartSaVioR)
On Sun, Oct 20, 2019 at 7:18 PM Paul Wais wrote:
> Dear List,
>
> I've observed some sort of memory leak w
ng, Long)]
>
> inputData.toDF()
> .selectExpr("_1", "CAST(_2 / 1000 AS TIMESTAMP) AS timestamp")
> .select(col("*"), window(col("timestamp"), "10 seconds", "5
> seconds").as("window"))
> .select(col("_1
rce and add jar via "--jars" option until artifact is
published.
I'd be happy to hear new ideas of improvements, and much appreciated for
contributions!
Enjoy!
Thanks,
Jungtaek Lim (HeartSaVioR)
Hi Alex,
you seem to hit SPARK-26606 [1] which has been fixed in 2.4.1. Could you
try it out with latest version?
Thanks,
Jungtaek Lim (HeartSaVioR)
1. https://issues.apache.org/jira/browse/SPARK-26606
On Tue, Aug 20, 2019 at 3:43 AM Alex Landa wrote:
> Hi,
>
> We are using Spark S
Great, thanks! Even better if you could share the slide as well (and if
possible video too), since it would be helpful for other users to
understand about the details.
Thanks again,
Jungtaek Lim (HeartSaVioR)
On Thu, Jun 27, 2019 at 7:33 PM Jacek Laskowski wrote:
> Hi,
>
> I've g
Glad to help, Jacek.
I'm happy you're doing similar thing, which means it could be pretty useful
for others as well. Looks like it might be good enough to contribute state
source and sink. I'll sort out my code and submit a PR.
Thanks,
Jungtaek Lim (HeartSaVioR)
On Thu, Jun 27,
which enables reading and writing state in structured streaming,
achieving rescaling and schema evolution.
https://github.com/HeartSaVioR/spark-state-tools
(DISCLAIMER: I'm an author of this tool.)
Thanks,
Jungtaek Lim (HeartSaVioR)
On Thu, Jun 27, 2019 at 4:48 AM Rong, Jialei
wrote:
> Thank
Nice finding!
Given you already pointed out previous issue which fixed similar issue, it
would be also easy for you to craft the patch and verify whether the fix
resolves your issue. Looking forward to see your patch.
Thanks,
Jungtaek Lim (HeartSaVioR)
On Wed, Jun 12, 2019 at 8:23 PM Gerard
puts of
previous stateful operator will become inputs of next stateful operator,
they should have different watermark) and one of contributor proposes the
approach [1] which would fit for Spark (unfortunately it haven't been
reviewed by committers so long).
Thanks,
Jungtaek Lim (HeartSaVioR)
have a match
> - inner join with 1 left outer followed by aggregation only produces the
> messages with a match
>
> Of course, all are stream-stream joins
>
> CU, Joe
>
> On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim
> wrote:
> > I would suspect that rows are
know once you're dealing with
streaming-streaming join.
Thanks,
Jungtaek Lim (HeartSaVioR)
1. https://issues.apache.org/jira/browse/SPARK-26154
On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann wrote:
> Hi all
>
> sorry, tl;dr
>
> I'm on my first Python Spark structured str
pp.
I'd be happy to hear new ideas of improvements, and much appreciated for
contributions!
Enjoy!
Thanks,
Jungtaek Lim (HeartSaVioR)
sure it's feasible.
Thanks,
Jungtaek Lim (HeartSaVioR)
2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성:
> I can understand that if you involve columns with variable distribution in
> join operations, it may change your execution plan, but most of the time
> this is not going to happe
The query makes state growing infinitely. Could you consider watermark
apply to "receivedAt" to let unnecessary part of state cleared out? Other
than watermark you could implement TTL based eviction via
flatMapGroupsWithState, though you'll need to implement your custom
"dropDuplicate".
2019년 3월 1
t wait and load information when one of them becomes master. That should
require pretty much changes though.
Hope this helps.
Thanks,
Jungtaek Lim (HeartSaVioR)
1. https://issues.apache.org/jira/browse/SPARK-15544
2. https://issues.apache.org/jira/browse/SPARK-23530
2019년 3월 5일 (화) 오후 10:02, lokeshkumar
e.org/jira/browse/SPARK-26350
2. https://issues.apache.org/jira/browse/SPARK-26121
Thanks,
Jungtaek Lim (HeartSaVioR)
2019년 2월 13일 (수) 오후 6:36, Gabor Somogyi 님이 작성:
> Hi Thomas,
>
> The issue occurs when the user does not have the READ permission on the
> consumer groups.
>
>
ng Java 11.
Thanks,
Jungtaek Lim (HeartSaVioR)
2019년 2월 7일 (목) 오후 9:18, Gabor Somogyi 님이 작성:
> Hi Hande,
>
> "Unsupported class file major version 55" means java incompatibility.
> This error means you're trying to load a Java "class" file that was
> compiled wi
+ INTERVAL 1 HOUR),
2.
Join on event-time windows (e.g. ...JOIN ON leftTimeWindow =
rightTimeWindow).
So yes, join condition should directly deal with timestamp column,
otherwise state will grow infinitely.
Thanks,
Jungtaek Lim (HeartSaVioR)
2018년 12월 11일 (화) 오후 2:52
uot;s"
and extra calls of "get_json_object".
- Jungtaek Lim (HeartSaVioR)
2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:
> Hello All,
>
> I am using Spark 2.3 version and i am trying to write Spark Streaming
> Join. It is a basic join and it is taking more time to join t
ke a look at it.
Btw, it might also help on batch query as well, actually sounds more
helpful on batch query.
-Jungtaek Lim (HeartSaVioR)
2018년 11월 18일 (일) 오전 9:53, puneetloya 님이 작성:
> I would like to request a feature for reading data from Kafka Source based
> on
> a timestamp. So that if
Could you explain what you're trying to do? It should have no batch for no
data in stream, so it will end up to no-op even it is possible.
- Jungtaek Lim (HeartSaVioR)
2018년 11월 6일 (화) 오전 8:29, Arun Manivannan 님이 작성:
> Hi,
>
> I would like to create a "zero" value fo
underlying
architecture.
-Jungtaek Lim
2018년 10월 29일 (월) 오전 12:06, Adrienne Kole 님이 작성:
> Thanks for bringing this issue to the mailing list.
> As an addition, I would also ask the same questions about DStreams and
> Structured Streaming APIs.
> Structured Streaming is high level and it make
119049
If we move the phase of filtering out late records, what you would like to
do may become the default behavior. This also means the output may be also
changed for queries which use non-stateful operations, so it is not a
trivial change and may require consensus like SPIP process.
Thanks,
Ju
Hi users, I'm Jungtaek Lim, one of contributors on streaming part.
Recently I proposed some new feature: native support of session window [1].
While it also tackles the edge-case map/flatMapGroupsWithState don't cover
for session window, its major benefit is mostly better usability
: string ... 2
more fields]
Maybe need to know about actual type of key, ticker, timeissued, price from
your variables.
Jungtaek Lim (HeartSaVioR)
2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh 님이 작성:
> I am trying to understand why spark cannot convert a simple comma
> separated columns as DF.
&
-of-org-apache/m-p/29994/highlight/true#M973
And which Spark version do you use?
2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim 님이 작성:
> Sorry I guess I pasted another method. the code is...
>
> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]):
> DatasetHolder[T] = {
>
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]):
DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}
2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim 님이 작성:
> I guess you need to have encoder for the type of result
[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type
doesn't match anything it may not work and you need to provide custom
Encoder.
-Jungtaek Lim (HeartSaVioR)
2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh 님이 작성:
>
You may need to import implicits from your spark session like below:
(Below code is borrowed from
https://spark.apache.org/docs/latest/sql-programming-guide.html)
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.s
$"mod", $"word")
.agg(max("value").as("max_value"), min("value").as("min_value"),
avg("value").as("avg_value"))
.coalesce(8)
val query = outDf.writeStream
.format("memory")
.optio
ss tasks.
We just can't apply coalesce to individual operator in narrow dependency.
-Jungtaek Lim (HeartSaVioR)
2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성:
> well an interesting side effect of this is that i can now control the
> number of partitions for every shuffle in a dataframe
ot;. Not sure there's
any available trick to achieve it without calling repartition.
Thanks,
Jungtaek Lim (HeartSaVioR)
1.
https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937
2018년 8월 9일 (목) 오전 5:55
Girish,
I think reading through implementation of HDFSBackedStateStoreProvider as
well as relevant traits should bring the idea to you how to implement
custom one. HDFSBackedStateStoreProvider is not that complicated to read
and understand. You just need to deal with your underlying storage engine
Could you please describe the version of Spark, and how did you run your
app? If you don’t mind to share minimal app which can reproduce this, it
would be really great.
- Jungtaek Lim (HeartSaVioR)
On Mon, 2 Jul 2018 at 7:56 PM kant kodali wrote:
> Hi All,
>
> I get the below error qu
and provide late input rows based on
this.
So I think this is valuable to address, and I'm planning to try to address
it, but it would be OK for someone to address it earlier.
Thanks,
Jungtaek Lim (HeartSaVioR)
2018년 7월 3일 (화) 오전 3:39, subramgr 님이 작성:
> Hi all,
>
> Do we have som
6541967934
1529640103,1.0606060606060606
1529640113,0.9997000899730081
Could you add streaming query listener and see the value of sources ->
numInputRows, inputRowsPerSecond, processedRowsPerSecond? They should
provide some valid numbers.
Thanks,
Jungtaek Lim (HeartSaVioR)
2018년 6월 22일 (금) 오전
It is not possible because the cardinality of the partitioning key is
non-deterministic, while partition count should be fixed. There's a chance
that cardinality > partition count and then the system can't ensure the
requirement.
Thanks,
Jungtaek Lim (HeartSaVioR)
2018년 6월 22
The issue looks like fixed in
https://issues.apache.org/jira/browse/SPARK-23670, and likely 2.3.1 will
include the fix.
-Jungtaek Lim (HeartSaVioR)
2018년 5월 23일 (수) 오후 7:12, weand 님이 작성:
> Thanks for clarification. So it really seem a Spark UI OOM Issue.
>
> After setting:
>
1. Could you share your Spark version?
2. Could you reduce "spark.sql.ui.retainedExecutions" and see whether it
helps? This configuration is available in 2.3.0, and default value is 1000.
Thanks,
Jungtaek Lim (HeartSaVioR)
2018년 5월 22일 (화) 오후 4:29, weand 님이 작성:
> You can see it e
Another thing you may want to be aware is, if the result is not idempotent,
your query result is also not idempotent. For fault-tolerance there's a
chance for record (row) to be replayed (recomputed).
-Jungtaek Lim (HeartSaVioR)
2018년 4월 24일 (화) 오후 2:07, Jörn Franke 님이 작성:
> What is
uot;)
.agg(max(struct($"AMOUNT", $"*")).as("data"))
.select($"data.*")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("1 seconds"))
.outputMode(OutputMode.Update())
.start()
It still have a minor
columnName: String)org.apache.spark.sql.Column
(e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
cannot be applied to (org.apache.spark.sql.ColumnName,
org.apache.spark.sql.Column)
Could you check your code to see it works with Spark 2.3 (via spark-shell
or whatever)?
Thanks!
Ju
P")
.groupBy($"ID")
.agg(maxrow(col("AMOUNT"), col("MY_TIMESTAMP")).as("maxrow"))
.selectExpr("ID", "maxrow.st.AMOUNT", "maxrow.st.MY_TIMESTAMP")
.writeStream
.format("console")
.trigger(Trigger.ProcessingTim
even
putting all five records together to the socket (by nc), two micro-batches
were handling the records and provide two results.
---
Batch: 0
---
+---+--++
| ID|AMOUNT|MY_TIMESTAMP|
+---+--+--
regation requires
Update/Complete mode but join requires Append mode.
(Guide page of structured streaming clearly explains such limitation:
"Cannot use streaming aggregation before joins.")
If you can achieve with mapGroupWithState, you may want to stick with that.
Btw, when you deal with
101 - 166 of 166 matches
Mail list logo