Re: Bugs with joins and SQL in Structured Streaming

2024-09-30 Thread Jungtaek Lim
I figured out the issue which breaks the second test in SqlSyntaxTest. This
is also a correctness issue, unfortunately.

Issue and the fix for OuterJoinTest:
https://issues.apache.org/jira/browse/SPARK-49829
Issue and the fix for SqlSyntaxTest:
https://issues.apache.org/jira/browse/SPARK-49836

Thanks again for reporting. I wish I hadn't missed this in Feb...


On Mon, Sep 30, 2024 at 7:13 AM Jungtaek Lim 
wrote:

> I just quickly looked into SqlSyntaxTest - the first broken test looks to
> be fixed via SPARK-46062
> <https://issues.apache.org/jira/browse/SPARK-46062> which was released in
> Spark 3.5.1. The second broken test is a valid issue and I'm yet to know
> why this is happening. I'll file a JIRA ticket and let me (or folks in my
> team) try to look into it. I'd be happy if there is a volunteer looking
> into this issue.
>
> On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Sorry I totally missed this email. This is forgotten for 6 months but I'm
>> happy that we have smart users reporting such complex edge-case issues!
>>
>> I haven't had time to validate all of them but OuterJoinTest is a valid
>> correctness issue indeed. Thanks for reporting to us! I figured out the
>> root cause and have a fix now. I will submit a fix soon.
>>
>> I also quickly looked into IntervalJoinTest but it looks like due to how
>> SS works.
>>
>> In the second time interval join, you may expect that lower bound of et1
>> = et3 - 5mins, and WM for et3 isn't delayed by the first time interval
>> join, hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3
>> - 5mins).
>>
>> But in SS, we have simplified the watermark model - input watermark is
>> calculated per "operator" level. (Also we still calculate global watermark
>> among watermark definition"s" and apply the same value to all
>> watermark definition"s.). So, in the second time interval join, WM for et3
>> is also considered as delayed by the first time interval join as input
>> watermark is "min" of all output watermarks from upstream, though it's not
>> participated in the first time interval join. That said, lower bound of et1
>> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
>> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
>> watermark to window.end + 5 mins does not produce the output and fails the
>> test.
>>
>> Please let me know if this does not make sense to you and we can discuss
>> more.
>>
>> I haven't had time to look into SqlSyntaxTest - we don't have enough
>> tests on interop between DataFrame <-> SQL for streaming query, so we might
>> have a non-trivial number of unknowns. I (or folks in my team) will take a
>> look sooner than later.
>>
>> Thanks again for the valuable report!
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>>
>> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera 
>> wrote:
>>
>>> Hi,
>>>
>>> Do you think there is any chance for this issue to get resolved? Should
>>> I create another bug report? As mentioned in my message, there is one open
>>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it
>>> covers only one of the problems.
>>>
>>> Andrzej
>>>
>>> wt., 27 lut 2024 o 09:58 Andrzej Zera 
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>> Yes, I tested all of them on spark 3.5.
>>>>
>>>> Regards,
>>>> Andrzej
>>>>
>>>>
>>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
>>>> napisał(a):
>>>>
>>>>> Hi,
>>>>>
>>>>> These are all on spark 3.5, correct?
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is wort

Re: Bugs with joins and SQL in Structured Streaming

2024-09-29 Thread Jungtaek Lim
I just quickly looked into SqlSyntaxTest - the first broken test looks to
be fixed via SPARK-46062 <https://issues.apache.org/jira/browse/SPARK-46062>
which was released in Spark 3.5.1. The second broken test is a valid issue
and I'm yet to know why this is happening. I'll file a JIRA ticket and let
me (or folks in my team) try to look into it. I'd be happy if there is a
volunteer looking into this issue.

On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim 
wrote:

> Sorry I totally missed this email. This is forgotten for 6 months but I'm
> happy that we have smart users reporting such complex edge-case issues!
>
> I haven't had time to validate all of them but OuterJoinTest is a valid
> correctness issue indeed. Thanks for reporting to us! I figured out the
> root cause and have a fix now. I will submit a fix soon.
>
> I also quickly looked into IntervalJoinTest but it looks like due to how
> SS works.
>
> In the second time interval join, you may expect that lower bound of et1 =
> et3 - 5mins, and WM for et3 isn't delayed by the first time interval join,
> hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 -
> 5mins).
>
> But in SS, we have simplified the watermark model - input watermark is
> calculated per "operator" level. (Also we still calculate global watermark
> among watermark definition"s" and apply the same value to all
> watermark definition"s.). So, in the second time interval join, WM for et3
> is also considered as delayed by the first time interval join as input
> watermark is "min" of all output watermarks from upstream, though it's not
> participated in the first time interval join. That said, lower bound of et1
> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
> watermark to window.end + 5 mins does not produce the output and fails the
> test.
>
> Please let me know if this does not make sense to you and we can discuss
> more.
>
> I haven't had time to look into SqlSyntaxTest - we don't have enough tests
> on interop between DataFrame <-> SQL for streaming query, so we might have
> a non-trivial number of unknowns. I (or folks in my team) will take a look
> sooner than later.
>
> Thanks again for the valuable report!
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera 
> wrote:
>
>> Hi,
>>
>> Do you think there is any chance for this issue to get resolved? Should I
>> create another bug report? As mentioned in my message, there is one open
>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
>> only one of the problems.
>>
>> Andrzej
>>
>> wt., 27 lut 2024 o 09:58 Andrzej Zera  napisał(a):
>>
>>> Hi,
>>>
>>> Yes, I tested all of them on spark 3.5.
>>>
>>> Regards,
>>> Andrzej
>>>
>>>
>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>> These are all on spark 3.5, correct?
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera 
>>>> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> I've been using Structured Streaming in production for almost a year
>>>>> already and I want to share the bugs I found in this time. I created a 
>>>>> test
>>>>> for each of the issues and put them all here:
>>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>>>
>>>>> I split the issues into three groups: outer join

Re: Bugs with joins and SQL in Structured Streaming

2024-09-28 Thread Jungtaek Lim
Sorry I totally missed this email. This is forgotten for 6 months but I'm
happy that we have smart users reporting such complex edge-case issues!

I haven't had time to validate all of them but OuterJoinTest is a valid
correctness issue indeed. Thanks for reporting to us! I figured out the
root cause and have a fix now. I will submit a fix soon.

I also quickly looked into IntervalJoinTest but it looks like due to how SS
works.

In the second time interval join, you may expect that lower bound of et1 =
et3 - 5mins, and WM for et3 isn't delayed by the first time interval join,
hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 -
5mins).

But in SS, we have simplified the watermark model - input watermark is
calculated per "operator" level. (Also we still calculate global watermark
among watermark definition"s" and apply the same value to all
watermark definition"s.). So, in the second time interval join, WM for et3
is also considered as delayed by the first time interval join as input
watermark is "min" of all output watermarks from upstream, though it's not
participated in the first time interval join. That said, lower bound of et1
= et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
watermark to window.end + 5 mins does not produce the output and fails the
test.

Please let me know if this does not make sense to you and we can discuss
more.

I haven't had time to look into SqlSyntaxTest - we don't have enough tests
on interop between DataFrame <-> SQL for streaming query, so we might have
a non-trivial number of unknowns. I (or folks in my team) will take a look
sooner than later.

Thanks again for the valuable report!

Thanks,
Jungtaek Lim (HeartSaVioR)



On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera  wrote:

> Hi,
>
> Do you think there is any chance for this issue to get resolved? Should I
> create another bug report? As mentioned in my message, there is one open
> already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
> only one of the problems.
>
> Andrzej
>
> wt., 27 lut 2024 o 09:58 Andrzej Zera  napisał(a):
>
>> Hi,
>>
>> Yes, I tested all of them on spark 3.5.
>>
>> Regards,
>> Andrzej
>>
>>
>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
>> napisał(a):
>>
>>> Hi,
>>>
>>> These are all on spark 3.5, correct?
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera 
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I've been using Structured Streaming in production for almost a year
>>>> already and I want to share the bugs I found in this time. I created a test
>>>> for each of the issues and put them all here:
>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>>
>>>> I split the issues into three groups: outer joins on event time,
>>>> interval joins and Spark SQL.
>>>>
>>>> Issues related to outer joins:
>>>>
>>>>- When joining three or more input streams on event time, if two or
>>>>more streams don't contain an event for a join key (which is event 
>>>> time),
>>>>no row will be output even if other streams contain an event for this 
>>>> join
>>>>key. Tests that check for this:
>>>>
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>>and
>>>>
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>>- When joining aggregated stream with raw even

Re: Issue with Materialized Views in Spark SQL

2024-05-02 Thread Jungtaek Lim
(removing dev@ as I don't think this is dev@ related thread but more about
"question")

My understanding is that Apache Spark does not support Materialized View.
That's all. IMHO it's not a proper expectation that all operations in
Apache Hive will be supported in Apache Spark. They are different projects
and Apache Spark does not aim to be 100% compatible with Apache Hive. There
was a time the community tried to provide some sort of compatibility, but
both projects are 10+ years old, and mature enough to have their own
roadmap to drive.

That said, that's not a bug or an issue. You can initiate a feature request
and wish the community to include that into the roadmap.

On Fri, May 3, 2024 at 12:01 PM Mich Talebzadeh 
wrote:

> An issue I encountered while working with Materialized Views in Spark SQL.
> It appears that there is an inconsistency between the behavior of
> Materialized Views in Spark SQL and Hive.
>
> When attempting to execute a statement like DROP MATERIALIZED VIEW IF
> EXISTS test.mv in Spark SQL, I encountered a syntax error indicating that
> the keyword MATERIALIZED is not recognized. However, the same statement
> executes successfully in Hive without any errors.
>
> pyspark.errors.exceptions.captured.ParseException:
> [PARSE_SYNTAX_ERROR] Syntax error at or near 'MATERIALIZED'.(line 1, pos 5)
>
> == SQL ==
> DROP MATERIALIZED VIEW IF EXISTS test.mv
> -^^^
>
> Here are the versions I am using:
>
>
>
> *Hive: 3.1.1Spark: 3.4*
> my Spark session:
>
> spark = SparkSession.builder \
>   .appName("test") \
>   .enableHiveSupport() \
>   .getOrCreate()
>
> Has anyone seen this behaviour or encountered a similar issue or if there
> are any insights into why this discrepancy exists between Spark SQL and
> Hive.
>
> Thanks
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>


Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-03-05 Thread Jungtaek Lim
Yeah the approach seems OK to me - please double check that the doc
generation in Spark repo won't fail after the move of the js file. Other
than that, it would be probably just a matter of updating the release
process.

On Tue, Mar 5, 2024 at 7:24 PM Pan,Bingkun  wrote:

> Okay, I see.
>
> Perhaps we can solve this confusion by sharing the same file `version.json`
> across `all versions` in the `Spark website repo`? Make each version of
> the document display the `same` data in the dropdown menu.
> ------
> *发件人:* Jungtaek Lim 
> *发送时间:* 2024年3月5日 17:09:07
> *收件人:* Pan,Bingkun
> *抄送:* Dongjoon Hyun; dev; user
> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>
> Let me be more specific.
>
> We have two active release version lines, 3.4.x and 3.5.x. We just
> released Spark 3.5.1, having a dropdown as 3.5.1 and 3.4.2 given the fact
> the last version of 3.4.x is 3.4.2. After a month we released Spark 3.4.3.
> In the dropdown of Spark 3.4.3, there will be 3.5.1 and 3.4.3. But if we
> call this as done, 3.5.1 (still latest) won't show 3.4.3 in the dropdown,
> giving confusion that 3.4.3 wasn't ever released.
>
> This is just about two active release version lines with keeping only the
> latest version of version lines. If you expand this to EOLed version lines
> and versions which aren't the latest in their version line, the problem
> gets much more complicated.
>
> On Tue, Mar 5, 2024 at 6:01 PM Pan,Bingkun  wrote:
>
>> Based on my understanding, we should not update versions that have
>> already been released,
>>
>> such as the situation you mentioned: `But what about dropout of version
>> D? Should we add E in the dropdown?` We only need to record the latest
>> `version. json` file that has already been published at the time of each
>> new document release.
>>
>> Of course, if we need to keep the latest in every document, I think it's
>> also possible.
>>
>> Only by sharing the same version. json file in each version.
>> --
>> *发件人:* Jungtaek Lim 
>> *发送时间:* 2024年3月5日 16:47:30
>> *收件人:* Pan,Bingkun
>> *抄送:* Dongjoon Hyun; dev; user
>> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>>
>> But this does not answer my question about updating the dropdown for the
>> doc of "already released versions", right?
>>
>> Let's say we just released version D, and the dropdown has version A, B,
>> C. We have another release tomorrow as version E, and it's probably easy to
>> add A, B, C, D in the dropdown of E. But what about dropdown of version D?
>> Should we add E in the dropdown? How do we maintain it if we will have 10
>> releases afterwards?
>>
>> On Tue, Mar 5, 2024 at 5:27 PM Pan,Bingkun  wrote:
>>
>>> According to my understanding, the original intention of this feature is
>>> that when a user has entered the pyspark document, if he finds that the
>>> version he is currently in is not the version he wants, he can easily jump
>>> to the version he wants by clicking on the drop-down box. Additionally, in
>>> this PR, the current automatic mechanism for PRs did not merge in.
>>>
>>> https://github.com/apache/spark/pull/42881
>>> <https://mailshield.baidu.com/check?q=NXF5O0EN4F6TOoAzxFGzXSJvMnQlPeztKpu%2fBYaKpd2sRl6qEYTx2NGUrTYUrhOk>
>>>
>>> So, we need to manually update this file. I can manually submit an
>>> update first to get this feature working.
>>> --
>>> *发件人:* Jungtaek Lim 
>>> *发送时间:* 2024年3月4日 6:34:42
>>> *收件人:* Dongjoon Hyun
>>> *抄送:* dev; user
>>> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>>>
>>> Shall we revisit this functionality? The API doc is built with
>>> individual versions, and for each individual version we depend on other
>>> released versions. This does not seem to be right to me. Also, the
>>> functionality is only in PySpark API doc which does not seem to be
>>> consistent as well.
>>>
>>> I don't think this is manageable with the current approach (listing
>>> versions in version-dependent doc). Let's say we release 3.4.3 after 3.5.1.
>>> Should we update the versions in 3.5.1 to add 3.4.3 in version switcher?
>>> How about the time we are going to release the new version after releasing
>>> 10 versions? What's the criteria of pruning the version?
>>>
>>> Unless we have a good answer to these questions, I think it's better to
>>> revert the functionality - it missed various c

Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-03-05 Thread Jungtaek Lim
Let me be more specific.

We have two active release version lines, 3.4.x and 3.5.x. We just released
Spark 3.5.1, having a dropdown as 3.5.1 and 3.4.2 given the fact the last
version of 3.4.x is 3.4.2. After a month we released Spark 3.4.3. In the
dropdown of Spark 3.4.3, there will be 3.5.1 and 3.4.3. But if we call this
as done, 3.5.1 (still latest) won't show 3.4.3 in the dropdown, giving
confusion that 3.4.3 wasn't ever released.

This is just about two active release version lines with keeping only the
latest version of version lines. If you expand this to EOLed version lines
and versions which aren't the latest in their version line, the problem
gets much more complicated.

On Tue, Mar 5, 2024 at 6:01 PM Pan,Bingkun  wrote:

> Based on my understanding, we should not update versions that have already
> been released,
>
> such as the situation you mentioned: `But what about dropout of version D?
> Should we add E in the dropdown?` We only need to record the latest
> `version. json` file that has already been published at the time of each
> new document release.
>
> Of course, if we need to keep the latest in every document, I think it's
> also possible.
>
> Only by sharing the same version. json file in each version.
> --
> *发件人:* Jungtaek Lim 
> *发送时间:* 2024年3月5日 16:47:30
> *收件人:* Pan,Bingkun
> *抄送:* Dongjoon Hyun; dev; user
> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>
> But this does not answer my question about updating the dropdown for the
> doc of "already released versions", right?
>
> Let's say we just released version D, and the dropdown has version A, B,
> C. We have another release tomorrow as version E, and it's probably easy to
> add A, B, C, D in the dropdown of E. But what about dropdown of version D?
> Should we add E in the dropdown? How do we maintain it if we will have 10
> releases afterwards?
>
> On Tue, Mar 5, 2024 at 5:27 PM Pan,Bingkun  wrote:
>
>> According to my understanding, the original intention of this feature is
>> that when a user has entered the pyspark document, if he finds that the
>> version he is currently in is not the version he wants, he can easily jump
>> to the version he wants by clicking on the drop-down box. Additionally, in
>> this PR, the current automatic mechanism for PRs did not merge in.
>>
>> https://github.com/apache/spark/pull/42881
>> <https://mailshield.baidu.com/check?q=NXF5O0EN4F6TOoAzxFGzXSJvMnQlPeztKpu%2fBYaKpd2sRl6qEYTx2NGUrTYUrhOk>
>>
>> So, we need to manually update this file. I can manually submit an update
>> first to get this feature working.
>> --
>> *发件人:* Jungtaek Lim 
>> *发送时间:* 2024年3月4日 6:34:42
>> *收件人:* Dongjoon Hyun
>> *抄送:* dev; user
>> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>>
>> Shall we revisit this functionality? The API doc is built with individual
>> versions, and for each individual version we depend on other released
>> versions. This does not seem to be right to me. Also, the functionality is
>> only in PySpark API doc which does not seem to be consistent as well.
>>
>> I don't think this is manageable with the current approach (listing
>> versions in version-dependent doc). Let's say we release 3.4.3 after 3.5.1.
>> Should we update the versions in 3.5.1 to add 3.4.3 in version switcher?
>> How about the time we are going to release the new version after releasing
>> 10 versions? What's the criteria of pruning the version?
>>
>> Unless we have a good answer to these questions, I think it's better to
>> revert the functionality - it missed various considerations.
>>
>> On Fri, Mar 1, 2024 at 2:44 PM Jungtaek Lim 
>> wrote:
>>
>>> Thanks for reporting - this is odd - the dropdown did not exist in other
>>> recent releases.
>>>
>>> https://spark.apache.org/docs/3.5.0/api/python/index.html
>>> <https://mailshield.baidu.com/check?q=uXELebgeq9ShKrQ3HDYtw08xGdWbbrT3FEzFk%2fzTZ%2bVxzlJrJa41y1xJkZ7RbZcLmQNMGzBVvVX6KlpxrtsKRQ%3d%3d>
>>> https://spark.apache.org/docs/3.4.2/api/python/index.html
>>> <https://mailshield.baidu.com/check?q=vFHg6IjqXnlPilWEcpu6a0oCJLXpFeNnsL6hZ%2fpZY0nGPd6tnUFbimhVD6zVpMlL8RAgxzN8GQM6cNBFe8hXvA%3d%3d>
>>> https://spark.apache.org/docs/3.3.4/api/python/index.html
>>> <https://mailshield.baidu.com/check?q=cfoH89Pu%2fNbZC4s7657SqqfHpT7hoppw7e6%2fZzsz8S7ZoEMm2LijOxwcGgKS5O29HzYUyQoooMRdy%2fd5Y36e2Q%3d%3d>
>>>
>>> Looks like the dropdown feature was recently introduced but partially
>>> done. The addition of a dropdown was done, but the way how

Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-03-05 Thread Jungtaek Lim
But this does not answer my question about updating the dropdown for the
doc of "already released versions", right?

Let's say we just released version D, and the dropdown has version A, B, C.
We have another release tomorrow as version E, and it's probably easy to
add A, B, C, D in the dropdown of E. But what about dropdown of version D?
Should we add E in the dropdown? How do we maintain it if we will have 10
releases afterwards?

On Tue, Mar 5, 2024 at 5:27 PM Pan,Bingkun  wrote:

> According to my understanding, the original intention of this feature is
> that when a user has entered the pyspark document, if he finds that the
> version he is currently in is not the version he wants, he can easily jump
> to the version he wants by clicking on the drop-down box. Additionally, in
> this PR, the current automatic mechanism for PRs did not merge in.
>
> https://github.com/apache/spark/pull/42881
>
> So, we need to manually update this file. I can manually submit an update
> first to get this feature working.
> --
> *发件人:* Jungtaek Lim 
> *发送时间:* 2024年3月4日 6:34:42
> *收件人:* Dongjoon Hyun
> *抄送:* dev; user
> *主题:* Re: [ANNOUNCE] Apache Spark 3.5.1 released
>
> Shall we revisit this functionality? The API doc is built with individual
> versions, and for each individual version we depend on other released
> versions. This does not seem to be right to me. Also, the functionality is
> only in PySpark API doc which does not seem to be consistent as well.
>
> I don't think this is manageable with the current approach (listing
> versions in version-dependent doc). Let's say we release 3.4.3 after 3.5.1.
> Should we update the versions in 3.5.1 to add 3.4.3 in version switcher?
> How about the time we are going to release the new version after releasing
> 10 versions? What's the criteria of pruning the version?
>
> Unless we have a good answer to these questions, I think it's better to
> revert the functionality - it missed various considerations.
>
> On Fri, Mar 1, 2024 at 2:44 PM Jungtaek Lim 
> wrote:
>
>> Thanks for reporting - this is odd - the dropdown did not exist in other
>> recent releases.
>>
>> https://spark.apache.org/docs/3.5.0/api/python/index.html
>> <https://mailshield.baidu.com/check?q=uXELebgeq9ShKrQ3HDYtw08xGdWbbrT3FEzFk%2fzTZ%2bVxzlJrJa41y1xJkZ7RbZcLmQNMGzBVvVX6KlpxrtsKRQ%3d%3d>
>> https://spark.apache.org/docs/3.4.2/api/python/index.html
>> <https://mailshield.baidu.com/check?q=vFHg6IjqXnlPilWEcpu6a0oCJLXpFeNnsL6hZ%2fpZY0nGPd6tnUFbimhVD6zVpMlL8RAgxzN8GQM6cNBFe8hXvA%3d%3d>
>> https://spark.apache.org/docs/3.3.4/api/python/index.html
>> <https://mailshield.baidu.com/check?q=cfoH89Pu%2fNbZC4s7657SqqfHpT7hoppw7e6%2fZzsz8S7ZoEMm2LijOxwcGgKS5O29HzYUyQoooMRdy%2fd5Y36e2Q%3d%3d>
>>
>> Looks like the dropdown feature was recently introduced but partially
>> done. The addition of a dropdown was done, but the way how to bump the
>> version was missed to be documented.
>> The contributor proposed the way to update the version "automatically",
>> but the PR wasn't merged. As a result, we are neither having the
>> instruction how to bump the version manually, nor having the automatic bump.
>>
>> * PR for addition of dropdown: https://github.com/apache/spark/pull/42428
>> <https://mailshield.baidu.com/check?q=pSDq2Cdb4aBtjOEg7J1%2fXPtYeSxjVkQfXKV%2fmfX1Y7NeT77hnIS%2bsvMbbXwT3DLm>
>> * PR for automatically bumping version:
>> https://github.com/apache/spark/pull/42881
>> <https://mailshield.baidu.com/check?q=NXF5O0EN4F6TOoAzxFGzXSJvMnQlPeztKpu%2fBYaKpd2sRl6qEYTx2NGUrTYUrhOk>
>>
>> We will probably need to add an instruction in the release process to
>> update the version. (For automatic bumping I don't have a good idea.)
>> I'll look into it. Please expect some delay during the holiday weekend
>> in S. Korea.
>>
>> Thanks again.
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Fri, Mar 1, 2024 at 2:14 PM Dongjoon Hyun 
>> wrote:
>>
>>> BTW, Jungtaek.
>>>
>>> PySpark document seems to show a wrong branch. At this time, `master`.
>>>
>>> https://spark.apache.org/docs/3.5.1/api/python/index.html
>>> <https://mailshield.baidu.com/check?q=KwooIjNwx9R5XjkTxvpqs6ApF2YX2ZujKl%2bha1PX%2bf3X4CQowIWtvSFmFPVO1297fFYMkgFMgmFuEBDkuDwpig%3d%3d>
>>>
>>> PySpark Overview
>>> <https://mailshield.baidu.com/check?q=rahGq5g%2bcbjBOU3xXCbESExdvGhXXTpk%2f%2f3BUMatX7zAgGbgcBy3mkuJmlmgtZZIoahnY2Cj2t4uylAFmefkTY1%2bQbN0rqSWYUU6qjrQRqY%3d>
>>>
>>>Date: Feb 24, 2024 Version: master
>>>

Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-03-04 Thread Jungtaek Lim
Yes, it's relevant to that PR. I wonder, if we want to expose version
switcher, it should be in versionless doc (spark-website) rather than the
doc being pinned to a specific version.

On Tue, Mar 5, 2024 at 11:18 AM Hyukjin Kwon  wrote:

> Is this related to https://github.com/apache/spark/pull/42428?
>
> cc @Yang,Jie(INF) 
>
> On Mon, 4 Mar 2024 at 22:21, Jungtaek Lim 
> wrote:
>
>> Shall we revisit this functionality? The API doc is built with individual
>> versions, and for each individual version we depend on other released
>> versions. This does not seem to be right to me. Also, the functionality is
>> only in PySpark API doc which does not seem to be consistent as well.
>>
>> I don't think this is manageable with the current approach (listing
>> versions in version-dependent doc). Let's say we release 3.4.3 after 3.5.1.
>> Should we update the versions in 3.5.1 to add 3.4.3 in version switcher?
>> How about the time we are going to release the new version after releasing
>> 10 versions? What's the criteria of pruning the version?
>>
>> Unless we have a good answer to these questions, I think it's better to
>> revert the functionality - it missed various considerations.
>>
>> On Fri, Mar 1, 2024 at 2:44 PM Jungtaek Lim 
>> wrote:
>>
>>> Thanks for reporting - this is odd - the dropdown did not exist in other
>>> recent releases.
>>>
>>> https://spark.apache.org/docs/3.5.0/api/python/index.html
>>> https://spark.apache.org/docs/3.4.2/api/python/index.html
>>> https://spark.apache.org/docs/3.3.4/api/python/index.html
>>>
>>> Looks like the dropdown feature was recently introduced but partially
>>> done. The addition of a dropdown was done, but the way how to bump the
>>> version was missed to be documented.
>>> The contributor proposed the way to update the version "automatically",
>>> but the PR wasn't merged. As a result, we are neither having the
>>> instruction how to bump the version manually, nor having the automatic bump.
>>>
>>> * PR for addition of dropdown:
>>> https://github.com/apache/spark/pull/42428
>>> * PR for automatically bumping version:
>>> https://github.com/apache/spark/pull/42881
>>>
>>> We will probably need to add an instruction in the release process to
>>> update the version. (For automatic bumping I don't have a good idea.)
>>> I'll look into it. Please expect some delay during the holiday weekend
>>> in S. Korea.
>>>
>>> Thanks again.
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>>
>>> On Fri, Mar 1, 2024 at 2:14 PM Dongjoon Hyun 
>>> wrote:
>>>
>>>> BTW, Jungtaek.
>>>>
>>>> PySpark document seems to show a wrong branch. At this time, `master`.
>>>>
>>>> https://spark.apache.org/docs/3.5.1/api/python/index.html
>>>>
>>>> PySpark Overview
>>>> <https://spark.apache.org/docs/3.5.1/api/python/index.html#pyspark-overview>
>>>>
>>>>Date: Feb 24, 2024 Version: master
>>>>
>>>> [image: Screenshot 2024-02-29 at 21.12.24.png]
>>>>
>>>>
>>>> Could you do the follow-up, please?
>>>>
>>>> Thank you in advance.
>>>>
>>>> Dongjoon.
>>>>
>>>>
>>>> On Thu, Feb 29, 2024 at 2:48 PM John Zhuge  wrote:
>>>>
>>>>> Excellent work, congratulations!
>>>>>
>>>>> On Wed, Feb 28, 2024 at 10:12 PM Dongjoon Hyun <
>>>>> dongjoon.h...@gmail.com> wrote:
>>>>>
>>>>>> Congratulations!
>>>>>>
>>>>>> Bests,
>>>>>> Dongjoon.
>>>>>>
>>>>>> On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:
>>>>>>
>>>>>>> Congratulations!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> At 2024-02-28 17:43:25, "Jungtaek Lim" 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> We are happy to announce the availability of Spark 3.5.1!
>>>>>>>
>>>>>>> Spark 3.5.1 is a maintenance release containing stability fixes. This
>>>>>>> release is based on the branch-3.5 maintenance branch of Spark. We
>>>>>>> strongly
>>>>>>> recommend all 3.5 users to upgrade to this stable release.
>>>>>>>
>>>>>>> To download Spark 3.5.1, head over to the download page:
>>>>>>> https://spark.apache.org/downloads.html
>>>>>>>
>>>>>>> To view the release notes:
>>>>>>> https://spark.apache.org/releases/spark-release-3-5-1.html
>>>>>>>
>>>>>>> We would like to acknowledge all community members for contributing
>>>>>>> to this
>>>>>>> release. This release would not have been possible without you.
>>>>>>>
>>>>>>> Jungtaek Lim
>>>>>>>
>>>>>>> ps. Yikun is helping us through releasing the official docker image
>>>>>>> for Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally
>>>>>>> available.
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> John Zhuge
>>>>>
>>>>


Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-03-03 Thread Jungtaek Lim
Shall we revisit this functionality? The API doc is built with individual
versions, and for each individual version we depend on other released
versions. This does not seem to be right to me. Also, the functionality is
only in PySpark API doc which does not seem to be consistent as well.

I don't think this is manageable with the current approach (listing
versions in version-dependent doc). Let's say we release 3.4.3 after 3.5.1.
Should we update the versions in 3.5.1 to add 3.4.3 in version switcher?
How about the time we are going to release the new version after releasing
10 versions? What's the criteria of pruning the version?

Unless we have a good answer to these questions, I think it's better to
revert the functionality - it missed various considerations.

On Fri, Mar 1, 2024 at 2:44 PM Jungtaek Lim 
wrote:

> Thanks for reporting - this is odd - the dropdown did not exist in other
> recent releases.
>
> https://spark.apache.org/docs/3.5.0/api/python/index.html
> https://spark.apache.org/docs/3.4.2/api/python/index.html
> https://spark.apache.org/docs/3.3.4/api/python/index.html
>
> Looks like the dropdown feature was recently introduced but partially
> done. The addition of a dropdown was done, but the way how to bump the
> version was missed to be documented.
> The contributor proposed the way to update the version "automatically",
> but the PR wasn't merged. As a result, we are neither having the
> instruction how to bump the version manually, nor having the automatic bump.
>
> * PR for addition of dropdown: https://github.com/apache/spark/pull/42428
> * PR for automatically bumping version:
> https://github.com/apache/spark/pull/42881
>
> We will probably need to add an instruction in the release process to
> update the version. (For automatic bumping I don't have a good idea.)
> I'll look into it. Please expect some delay during the holiday weekend
> in S. Korea.
>
> Thanks again.
> Jungtaek Lim (HeartSaVioR)
>
>
> On Fri, Mar 1, 2024 at 2:14 PM Dongjoon Hyun 
> wrote:
>
>> BTW, Jungtaek.
>>
>> PySpark document seems to show a wrong branch. At this time, `master`.
>>
>> https://spark.apache.org/docs/3.5.1/api/python/index.html
>>
>> PySpark Overview
>> <https://spark.apache.org/docs/3.5.1/api/python/index.html#pyspark-overview>
>>
>>Date: Feb 24, 2024 Version: master
>>
>> [image: Screenshot 2024-02-29 at 21.12.24.png]
>>
>>
>> Could you do the follow-up, please?
>>
>> Thank you in advance.
>>
>> Dongjoon.
>>
>>
>> On Thu, Feb 29, 2024 at 2:48 PM John Zhuge  wrote:
>>
>>> Excellent work, congratulations!
>>>
>>> On Wed, Feb 28, 2024 at 10:12 PM Dongjoon Hyun 
>>> wrote:
>>>
>>>> Congratulations!
>>>>
>>>> Bests,
>>>> Dongjoon.
>>>>
>>>> On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:
>>>>
>>>>> Congratulations!
>>>>>
>>>>>
>>>>>
>>>>> At 2024-02-28 17:43:25, "Jungtaek Lim" 
>>>>> wrote:
>>>>>
>>>>> Hi everyone,
>>>>>
>>>>> We are happy to announce the availability of Spark 3.5.1!
>>>>>
>>>>> Spark 3.5.1 is a maintenance release containing stability fixes. This
>>>>> release is based on the branch-3.5 maintenance branch of Spark. We
>>>>> strongly
>>>>> recommend all 3.5 users to upgrade to this stable release.
>>>>>
>>>>> To download Spark 3.5.1, head over to the download page:
>>>>> https://spark.apache.org/downloads.html
>>>>>
>>>>> To view the release notes:
>>>>> https://spark.apache.org/releases/spark-release-3-5-1.html
>>>>>
>>>>> We would like to acknowledge all community members for contributing to
>>>>> this
>>>>> release. This release would not have been possible without you.
>>>>>
>>>>> Jungtaek Lim
>>>>>
>>>>> ps. Yikun is helping us through releasing the official docker image
>>>>> for Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally
>>>>> available.
>>>>>
>>>>>
>>>
>>> --
>>> John Zhuge
>>>
>>


Re: [ANNOUNCE] Apache Spark 3.5.1 released

2024-02-29 Thread Jungtaek Lim
Thanks for reporting - this is odd - the dropdown did not exist in other
recent releases.

https://spark.apache.org/docs/3.5.0/api/python/index.html
https://spark.apache.org/docs/3.4.2/api/python/index.html
https://spark.apache.org/docs/3.3.4/api/python/index.html

Looks like the dropdown feature was recently introduced but partially done.
The addition of a dropdown was done, but the way how to bump the version
was missed to be documented.
The contributor proposed the way to update the version "automatically", but
the PR wasn't merged. As a result, we are neither having the instruction
how to bump the version manually, nor having the automatic bump.

* PR for addition of dropdown: https://github.com/apache/spark/pull/42428
* PR for automatically bumping version:
https://github.com/apache/spark/pull/42881

We will probably need to add an instruction in the release process to
update the version. (For automatic bumping I don't have a good idea.)
I'll look into it. Please expect some delay during the holiday weekend
in S. Korea.

Thanks again.
Jungtaek Lim (HeartSaVioR)


On Fri, Mar 1, 2024 at 2:14 PM Dongjoon Hyun 
wrote:

> BTW, Jungtaek.
>
> PySpark document seems to show a wrong branch. At this time, `master`.
>
> https://spark.apache.org/docs/3.5.1/api/python/index.html
>
> PySpark Overview
> <https://spark.apache.org/docs/3.5.1/api/python/index.html#pyspark-overview>
>
>Date: Feb 24, 2024 Version: master
>
> [image: Screenshot 2024-02-29 at 21.12.24.png]
>
>
> Could you do the follow-up, please?
>
> Thank you in advance.
>
> Dongjoon.
>
>
> On Thu, Feb 29, 2024 at 2:48 PM John Zhuge  wrote:
>
>> Excellent work, congratulations!
>>
>> On Wed, Feb 28, 2024 at 10:12 PM Dongjoon Hyun 
>> wrote:
>>
>>> Congratulations!
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>> On Wed, Feb 28, 2024 at 11:43 AM beliefer  wrote:
>>>
>>>> Congratulations!
>>>>
>>>>
>>>>
>>>> At 2024-02-28 17:43:25, "Jungtaek Lim" 
>>>> wrote:
>>>>
>>>> Hi everyone,
>>>>
>>>> We are happy to announce the availability of Spark 3.5.1!
>>>>
>>>> Spark 3.5.1 is a maintenance release containing stability fixes. This
>>>> release is based on the branch-3.5 maintenance branch of Spark. We
>>>> strongly
>>>> recommend all 3.5 users to upgrade to this stable release.
>>>>
>>>> To download Spark 3.5.1, head over to the download page:
>>>> https://spark.apache.org/downloads.html
>>>>
>>>> To view the release notes:
>>>> https://spark.apache.org/releases/spark-release-3-5-1.html
>>>>
>>>> We would like to acknowledge all community members for contributing to
>>>> this
>>>> release. This release would not have been possible without you.
>>>>
>>>> Jungtaek Lim
>>>>
>>>> ps. Yikun is helping us through releasing the official docker image for
>>>> Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally 
>>>> available.
>>>>
>>>>
>>
>> --
>> John Zhuge
>>
>


[ANNOUNCE] Apache Spark 3.5.1 released

2024-02-28 Thread Jungtaek Lim
Hi everyone,

We are happy to announce the availability of Spark 3.5.1!

Spark 3.5.1 is a maintenance release containing stability fixes. This
release is based on the branch-3.5 maintenance branch of Spark. We strongly
recommend all 3.5 users to upgrade to this stable release.

To download Spark 3.5.1, head over to the download page:
https://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-3-5-1.html

We would like to acknowledge all community members for contributing to this
release. This release would not have been possible without you.

Jungtaek Lim

ps. Yikun is helping us through releasing the official docker image for
Spark 3.5.1 (Thanks Yikun!) It may take some time to be generally available.


Re: Issue in Creating Temp_view in databricks and using spark.sql().

2024-01-31 Thread Jungtaek Lim
Hi,

Streaming query clones the spark session - when you create a temp view from
DataFrame, the temp view is created under the cloned session. You will need
to use micro_batch_df.sparkSession to access the cloned session.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Jan 31, 2024 at 3:29 PM Karthick Nk  wrote:

> Hi Team,
>
> I am using structered streaming in pyspark in azure Databricks, in that I
> am creating temp_view from dataframe
> (df.createOrReplaceTempView('temp_view')) for performing spark sql query
> transformation.
> In that I am facing the issue that temp_view not found, so that as a
> workaround i have created global temp_view to use.
> But same when i have tried to create without streaming, i am able to
> perform the temp_view.
>
>
> write_to_final_table =
>  
> (spark.readStream.format('delta').option('ignoreChanges',True).table(f"{delta_table_name}")).writeStream.queryName(f"{query_name}").format("org.elasticsearch.spark.sql").trigger(processingTime=f'1
> minutes').outputMode("append").foreachBatch(process_micro_batch).option("checkpointLocation",checkpointdirectory_path).option("mergeSchema",
> "true").option("failOnDataLoss", "false").start()
>
>
> def process_micro_batch(micro_batch_df, batchId) :
> micro_batch_df.createOrReplaceTempView("temp_view")
> df = spark.sql(f"select * from temp_view")
> return df
>
> Here, I am getting error, while reading data from temp_view that temp_view
> not found error.
>
>
> I need to perform or create temp_view (*Not global temp_view)based on the
> dataframe, and need to perform the spark sql transformation in structered
> streaming.
>
> I have few question in my hand?
> 1. is strucutered streaming and spark.sql will have different
> spark.context within same databricks notebook?
> 2. If i want to create temp_view based on the dataframe and need to
> perform the spark sql operation, how can i create the tempview (Not global
> tempview, Since global temp view will be available in the cluster level
> across all the notebook)?
>
> Thanks & Regards
>


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-11 Thread Jungtaek Lim
If you use RocksDB state store provider, you can turn on changelog
checkpoint to put the single changelog file per partition per batch. With
disabling changelog checkpoint, Spark uploads newly created SST files and
some log files. If compaction had happened, most SST files have to be
re-uploaded. Using changelog checkpoint would upload the snapshot a lot
less frequently, and actually better latency on committing.

On Sat, Jan 6, 2024 at 7:40 PM Andrzej Zera  wrote:

> Hey,
>
> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
> require near-real time accuracy with trigger intervals in the level of 5-10
> seconds. I usually run 3-6 streaming queries as part of the job and each
> query includes at least one stateful operation (and usually two or more).
> My checkpoint location is S3 bucket and I use RocksDB as a state store.
> Unfortunately, checkpointing costs are quite high. It's the main cost item
> of the system and it's roughly 4-5 times the cost of compute.
>
> To save on compute costs, the following things are usually recommended:
>
>- increase trigger interval (as mentioned, I don't have much space
>here)
>- decrease the number of shuffle partitions (I have 2x the number of
>workers)
>
> I'm looking for some other recommendations that I can use to save on
> checkpointing costs. I saw that most requests are LIST requests. Can we cut
> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
> will it help in any way?
>
> Thank you!
> Andrzej
>
>


Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-11 Thread Jungtaek Lim
Hi,

The time window is closed and evicted as long as "eviction watermark"
passes the end of the window. Late events watermark only deals with
discarding late events from "inputs". We did not introduce additional delay
on the work of multiple stateful operators. We just allowed more late
events to be accepted.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera  wrote:

> I'm struggling with the following issue in Spark >=3.4, related to
> multiple stateful operations.
>
> When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
> keeps track of two types of watermarks: eventTimeWatermarkForEviction and
> eventTimeWatermarkForLateEvents. Introducing them allowed chaining
> multiple stateful operations but also introduced an additional delay for
> getting the output out of the streaming query.
>
> I'll show this on the example. Assume we have a stream of click events and
> we aggregate it first by 1-min window and then by 5-min window. If we have
> a trigger interval of 30s, then in most cases we'll get output 30s later
> compared to single stateful operations queries. To find out how, let's look
> at the following examples:
>
> Example 1. Single stateful operation (aggregation by 5-min window, assume
> watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp
> at the time of getting data from Kafka
> Global watermark Output
> 14:10:00 14:09:56 0 -
> 14:10:30 14:10:26 14:09:56 -
> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>
> Example 2. Mutliple stateful operations (aggregation by 1-min window
> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>
> Wall clock
> (microbatch processing starts) Max event timestamp at the time of getting
> data from Kafka Late events watermark Eviction watermark Output
> 14:10:00 14:09:56 0 0 -
> 14:10:30 14:10:26 0 14:09:56 -
> 14:11:00 14:10:56 14:09:56 14:10:26 -
> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>
> In Example 2, we need to wait until both watermarks cross the end of the
> window to get the output for that window, which happens one iteration later
> compared to Example 1.
>
> Now, in use cases that require near-real-time processing, this one
> iteration delay can be quite a significant difference.
>
> Do we have any option to make streaming queries with multiple stateful
> operations output data without waiting this extra iteration? One of my
> ideas was to force an empty microbatch to run and propagate late events
> watermark without any new data. While this conceptually works, I didn't
> find a way to trigger an empty microbatch while being connected to Kafka
> that constantly receives new data and while having a constant 30s trigger
> interval.
>
> Thanks,
> Andrzej
>


Re: Spark structured streaming tab is missing from spark web UI

2023-11-24 Thread Jungtaek Lim
The feature was added in Spark 3.0. Btw, you may want to check out the EOL
date for Apache Spark releases - https://endoflife.date/apache-spark 2.x is
already EOLed.


On Fri, Nov 24, 2023 at 11:13 PM mallesh j 
wrote:

> Hi Team,
>
> I am trying to test the performance of a spark streaming application that
> I wrote which reads/writes data to Kafka. Code is working fine but I cannot
> see the Streaming tab in the UI. I tried enabling it by adding below config
> to spark conf but still no luck. I have even checked in Google/Stack
> overflow on this but did not get it. So can you please check and let me
> know on the same ? If it is present or not , how can I enable it?
>
> Attached is the screenshot for the same
>
> Spark version 2.4
> Scala version 2.11
>
>
> Thanks & Regards
>  Mallesh Jogu,
> + 919493390341.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: How exactly does dropDuplicatesWithinWatermark work?

2023-11-21 Thread Jungtaek Lim
I'll probably reply the same to SO but posting here first.

This is mentioned in JIRA ticket, design doc, and also API doc, but to
reiterate, the contract/guarantee of the new API is that the API will
deduplicate events properly when the max distance of all your duplicate
events are less than watermark delay. The internal implementation is
slightly complicated and depends on the first arrived event per
duplication, and the API does not promise any behavior beyond
the contract/guarantee. You cannot expect any strict behavior beyond the
contract/guarantee.

The main use case of this new API is to cover with writers which guarantees
"at-least-once", which has a risk of duplication. E.g. Writing data to a
Kafka topic without a transaction could end up with duplication. In most
cases, duplicated writes for the same data would happen within a
predictable time frame, and this new API will ensure that these duplicated
writes are deduplicated once users provide the max distance of time (max -
min) among duplicated events as delay threshold of watermark.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Nov 20, 2023 at 10:18 AM Perfect Stranger 
wrote:

> Hello, I have trouble understanding how dropDuplicatesWithinWatermark
> works. And I posted this stackoverflow question:
>
> https://stackoverflow.com/questions/77512507/how-exactly-does-dropduplicateswithinwatermark-work
>
> Could somebody answer it please?
>
> Best Regards,
> Pavel.
>


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-26 Thread Jungtaek Lim
Hi, we are aware of your ticket and plan to look into it. We can't say
about ETA but just wanted to let you know that we are going to look into
it. Thanks for reporting!

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera  wrote:

> Hey All,
>
> I'm trying to reproduce the following streaming operation: "Time window
> aggregation in separate streams followed by stream-stream join". According
> to documentation, this should be possible in Spark 3.5.0 but I had no
> success despite different tries.
>
> Here is a documentation snippet I'm trying to reproduce:
> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>
> I created an issue with more details but no one responded yet:
> https://issues.apache.org/jira/browse/SPARK-45637
>
> Thank you!
> Andrzej
>


Re: Slack for PySpark users

2023-04-03 Thread Jungtaek Lim
Just to be clear, if there is no strong volunteer to make the new community
channel stay active, I'd probably be OK to not fork the channel. You can
see a strong counter example from #spark channel in ASF. It is the place
where there are only questions and promos but zero answers. I see
volunteers here demanding for another channel, so I want to see us go with
the most preferred way for these volunteers.

User mailing list does not go in a good shape. I hope we give another try
with recent technology to see whether we can gain traction - if we fail,
the user mailing list will still be there.

On Tue, Apr 4, 2023 at 7:04 AM Jungtaek Lim 
wrote:

> The number of subscribers doesn't give any meaningful value. Please look
> into the number of mails being sent to the list.
>
> https://lists.apache.org/list.html?user@spark.apache.org
> The latest month there were more than 200 emails being sent was Feb 2022,
> more than a year ago. It was more than 1k in 2016, and more than 2k in 2015
> and earlier.
> Let's face the fact. User mailing list is dying, even before we start
> discussion about alternative communication methods.
>
> Users never go with the way if it's just because PMC members (or ASF) have
> preference. They are going with the way they are convenient.
>
> Same applies here - if ASF Slack requires a restricted invitation
> mechanism then it won't work. Looks like there is a link for an invitation,
> but we are also talking about the cost as well.
> https://cwiki.apache.org/confluence/display/INFRA/Slack+Guest+Invites
> As long as we are being serious about the cost, I don't think we are going
> to land in the way "users" are convenient.
>
> On Tue, Apr 4, 2023 at 4:59 AM Dongjoon Hyun 
> wrote:
>
>> As Mich Talebzadeh pointed out, Apache Spark has an official Slack
>> channel.
>>
>> > It's unavoidable if "users" prefer to use an alternative communication
>> mechanism rather than the user mailing list.
>>
>> The following is the number of people in the official channels.
>>
>> - user@spark.apache.org has 4519 subscribers.
>> - d...@spark.apache.org has 3149 subscribers.
>> - ASF Official Slack channel has 602 subscribers.
>>
>> May I ask if the users prefer to use the ASF Official Slack channel
>> than the user mailing list?
>>
>> Dongjoon.
>>
>>
>>
>> On Thu, Mar 30, 2023 at 9:10 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> I'm reading through the page "Briefing: The Apache Way", and in the
>>> section of "Open Communications", restriction of communication inside ASF
>>> INFRA (mailing list) is more about code and decision-making.
>>>
>>> https://www.apache.org/theapacheway/#what-makes-the-apache-way-so-hard-to-define
>>>
>>> It's unavoidable if "users" prefer to use an alternative communication
>>> mechanism rather than the user mailing list. Before Stack Overflow days,
>>> there had been a meaningful number of questions around user@. It's just
>>> impossible to let them go back and post to the user mailing list.
>>>
>>> We just need to make sure it is not the purpose of employing Slack to
>>> move all discussions about developments, direction of the project, etc
>>> which must happen in dev@/private@. The purpose of Slack thread here
>>> does not seem to aim to serve the purpose.
>>>
>>>
>>> On Fri, Mar 31, 2023 at 7:00 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Good discussions and proposals.all around.
>>>>
>>>> I have used slack in anger on a customer site before. For small and
>>>> medium size groups it is good and affordable. Alternatives have been
>>>> suggested as well so those who like investigative search can agree and come
>>>> up with a freebie one.
>>>> I am inclined to agree with Bjorn that this slack has more social
>>>> dimensions than the mailing list. It is akin to a sports club using
>>>> WhatsApp groups for communication. Remember we were originally looking for
>>>> space for webinars, including Spark on Linkedin that Denney Lee suggested.
>>>> I think Slack and mailing groups can coexist happily. On a more serious
>>>> note, when I joined the user group back in 2015-2016, there was a lot of
>>>> traffic. Currently we hardly get many mails daily <> less than 5. So having
>>>> a slack type medium may improve members participation.
>>>>
>>>> so +1 for me a

Re: Slack for PySpark users

2023-04-03 Thread Jungtaek Lim
The number of subscribers doesn't give any meaningful value. Please look
into the number of mails being sent to the list.

https://lists.apache.org/list.html?user@spark.apache.org
The latest month there were more than 200 emails being sent was Feb 2022,
more than a year ago. It was more than 1k in 2016, and more than 2k in 2015
and earlier.
Let's face the fact. User mailing list is dying, even before we start
discussion about alternative communication methods.

Users never go with the way if it's just because PMC members (or ASF) have
preference. They are going with the way they are convenient.

Same applies here - if ASF Slack requires a restricted invitation mechanism
then it won't work. Looks like there is a link for an invitation, but we
are also talking about the cost as well.
https://cwiki.apache.org/confluence/display/INFRA/Slack+Guest+Invites
As long as we are being serious about the cost, I don't think we are going
to land in the way "users" are convenient.

On Tue, Apr 4, 2023 at 4:59 AM Dongjoon Hyun 
wrote:

> As Mich Talebzadeh pointed out, Apache Spark has an official Slack channel.
>
> > It's unavoidable if "users" prefer to use an alternative communication
> mechanism rather than the user mailing list.
>
> The following is the number of people in the official channels.
>
> - user@spark.apache.org has 4519 subscribers.
> - d...@spark.apache.org has 3149 subscribers.
> - ASF Official Slack channel has 602 subscribers.
>
> May I ask if the users prefer to use the ASF Official Slack channel
> than the user mailing list?
>
> Dongjoon.
>
>
>
> On Thu, Mar 30, 2023 at 9:10 PM Jungtaek Lim 
> wrote:
>
>> I'm reading through the page "Briefing: The Apache Way", and in the
>> section of "Open Communications", restriction of communication inside ASF
>> INFRA (mailing list) is more about code and decision-making.
>>
>> https://www.apache.org/theapacheway/#what-makes-the-apache-way-so-hard-to-define
>>
>> It's unavoidable if "users" prefer to use an alternative communication
>> mechanism rather than the user mailing list. Before Stack Overflow days,
>> there had been a meaningful number of questions around user@. It's just
>> impossible to let them go back and post to the user mailing list.
>>
>> We just need to make sure it is not the purpose of employing Slack to
>> move all discussions about developments, direction of the project, etc
>> which must happen in dev@/private@. The purpose of Slack thread here
>> does not seem to aim to serve the purpose.
>>
>>
>> On Fri, Mar 31, 2023 at 7:00 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Good discussions and proposals.all around.
>>>
>>> I have used slack in anger on a customer site before. For small and
>>> medium size groups it is good and affordable. Alternatives have been
>>> suggested as well so those who like investigative search can agree and come
>>> up with a freebie one.
>>> I am inclined to agree with Bjorn that this slack has more social
>>> dimensions than the mailing list. It is akin to a sports club using
>>> WhatsApp groups for communication. Remember we were originally looking for
>>> space for webinars, including Spark on Linkedin that Denney Lee suggested.
>>> I think Slack and mailing groups can coexist happily. On a more serious
>>> note, when I joined the user group back in 2015-2016, there was a lot of
>>> traffic. Currently we hardly get many mails daily <> less than 5. So having
>>> a slack type medium may improve members participation.
>>>
>>> so +1 for me as well.
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 30 Mar 2023 at 22:19, Denny Lee  wrote:
>>>
>>>> +1.
>>>>
>>>> To Shani’s point, there are multiple OSS pro

Re: Slack for PySpark users

2023-03-30 Thread Jungtaek Lim
I'm reading through the page "Briefing: The Apache Way", and in the section
of "Open Communications", restriction of communication inside ASF INFRA
(mailing list) is more about code and decision-making.
https://www.apache.org/theapacheway/#what-makes-the-apache-way-so-hard-to-define

It's unavoidable if "users" prefer to use an alternative communication
mechanism rather than the user mailing list. Before Stack Overflow days,
there had been a meaningful number of questions around user@. It's just
impossible to let them go back and post to the user mailing list.

We just need to make sure it is not the purpose of employing Slack to move
all discussions about developments, direction of the project, etc which
must happen in dev@/private@. The purpose of Slack thread here does not
seem to aim to serve the purpose.


On Fri, Mar 31, 2023 at 7:00 AM Mich Talebzadeh 
wrote:

> Good discussions and proposals.all around.
>
> I have used slack in anger on a customer site before. For small and medium
> size groups it is good and affordable. Alternatives have been suggested as
> well so those who like investigative search can agree and come up with a
> freebie one.
> I am inclined to agree with Bjorn that this slack has more social
> dimensions than the mailing list. It is akin to a sports club using
> WhatsApp groups for communication. Remember we were originally looking for
> space for webinars, including Spark on Linkedin that Denney Lee suggested.
> I think Slack and mailing groups can coexist happily. On a more serious
> note, when I joined the user group back in 2015-2016, there was a lot of
> traffic. Currently we hardly get many mails daily <> less than 5. So having
> a slack type medium may improve members participation.
>
> so +1 for me as well.
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 30 Mar 2023 at 22:19, Denny Lee  wrote:
>
>> +1.
>>
>> To Shani’s point, there are multiple OSS projects that use the free Slack
>> version - top of mind include Delta, Presto, Flink, Trino, Datahub, MLflow,
>> etc.
>>
>> On Thu, Mar 30, 2023 at 14:15  wrote:
>>
>>> Hey everyone,
>>>
>>> I think we should remain on a free program in slack.
>>>
>>> In my option the free program is more then enough, the only down side is
>>> we could only see the last 90 days messages.
>>>
>>> From what I know the Airflow community (which has strong active
>>> community in slack) also use the free program (You can tell by the 90 days
>>> limit notice in their workspace).
>>>
>>> You can find the pricing and features comparison between the slack
>>> programs here  .
>>>
>>> Have a great day,
>>> Shani
>>>
>>> On 30 Mar 2023, at 23:38, Mridul Muralidharan  wrote:
>>>
>>> 
>>>
>>>
>>> Thanks for flagging the concern Dongjoon, I was not aware of the
>>> discussion - but I can understand the concern.
>>> Would be great if you or Matei could update the thread on the result of
>>> deliberations, once it reaches a logical consensus: before we set up
>>> official policy around it.
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>> On Thu, Mar 30, 2023 at 4:23 PM Bjørn Jørgensen <
>>> bjornjorgen...@gmail.com> wrote:
>>>
 I like the idea of having a talk channel. It can make it easier for
 everyone to say hello. Or to dare to ask about small or big matters that
 you would not have dared to ask about before on mailing lists.
 But then there is the price and what is the best for an open source
 project.

 The price for using slack is expensive.
 Right now for those that have join spark slack
 $8.75 USD
 72 members
 1 month
 $630 USD

 https://app.slack.com/plans/T04URTRBZ1R/checkout/form?entry_point=hero_banner_upgrade_cta&s=2

 And they - slack does not have an option for open source projects.

 There seems to be some alternatives for open source software. I have
 not tried it.
 Like https://www.rocket.chat/blog/slack-open-source-alternatives

 


 rocket chat is open source https://github.com/RocketChat/Rocket.Chat

 tor. 30. mar. 2023 kl. 18:54 skrev Mich Talebzadeh <
 mich.talebza...@gmail.com>:

> Hi Dongjoon
>
> to your points if I may
>
> - Do you have any reference from other official ASF-related Slack
> channels?
>No, I don't have any reference from other official ASF-related
> Slack channels because I don't think th

Re: Create a Jira account

2022-12-01 Thread Jungtaek Lim
There is a guide in the page - send the request mail to
priv...@spark.apache.org.

On Thu, Dec 1, 2022 at 10:07 PM ideal  wrote:

> hello
> i need to open a Jira ticket for spark about thrift server operation
> log output is empty. but i do not have an ASF Jira account.  recently Infra
> ended public signups to ASF Jira accounts,detail is here
> https://infra.apache.org/jira-guidelines.html#who
>
> please help me create a ASF Jira account,thank you.
>
> ideal
> idealsp...@foxmail.com
>
> 
>
>


Re: [ANNOUNCE] Apache Spark 3.2.3 released

2022-11-30 Thread Jungtaek Lim
Thanks Chao for driving the release!

On Wed, Nov 30, 2022 at 6:03 PM Wenchen Fan  wrote:

> Thanks, Chao!
>
> On Wed, Nov 30, 2022 at 1:33 AM Chao Sun  wrote:
>
>> We are happy to announce the availability of Apache Spark 3.2.3!
>>
>> Spark 3.2.3 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.2 maintenance branch of Spark. We
>> strongly
>> recommend all 3.2 users to upgrade to this stable release.
>>
>> To download Spark 3.2.3, head over to the download page:
>> https://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-2-3.html
>>
>> We would like to acknowledge all community members for contributing to
>> this
>> release. This release would not have been possible without you.
>>
>> Chao
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Creating a Spark 3 Connector

2022-11-23 Thread Jungtaek Lim
Bjørn, that is the project of "spark connect" which dissociates client and
server from Spark driver. Not related to data source (which is also known
as connector).

Mitch, as I understand correctly, unfortunately we don't have dedicated
documentation for implementing data source/connectors. It's encouraged to
look at reference implementations like Kafka and understand interfaces.
Each interface has its own documentation so it will guide you to implement
your own.

Please post any question on dev@ mailing list if you have doubts or are
stuck with implementing it.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Nov 24, 2022 at 7:06 AM Bjørn Jørgensen 
wrote:

> This is from the vote for spark connector. Is this you are looking for?
>
> The goal of the SPIP is to introduce a Dataframe based client/server API
> for Spark
>
> Please also refer to:
>
> - Previous discussion in dev mailing list: [DISCUSS] SPIP: Spark Connect
> - A client and server interface for Apache Spark.
> <https://lists.apache.org/thread/3fd2n34hlyg872nr55rylbv5cg8m1556>
> - Design doc: Spark Connect - A client and server interface for Apache
> Spark.
> <https://docs.google.com/document/d/1Mnl6jmGszixLW4KcJU5j9IgpG9-UabS0dcM6PM2XGDc/edit?usp=sharing>
> - JIRA: SPARK-39375 <https://issues.apache.org/jira/browse/SPARK-39375>
>
> ons. 23. nov. 2022 kl. 17:36 skrev Mitch Shepherd <
> mitch.sheph...@marklogic.com>:
>
>> Hello,
>>
>>
>>
>> I’m wondering if anyone can point me in the right direction for a Spark
>> connector developer guide.
>>
>>
>>
>> I’m looking for information on writing a new connector for Spark to move
>> data between Apache Spark and other systems.
>>
>>
>>
>> Any information would be helpful. I found a similar thing for Kafka
>> <https://docs.confluent.io/platform/current/connect/devguide.html> but
>> haven’t been able to track down documentation for Spark.
>>
>>
>>
>> Best,
>>
>> Mitch
>>
>> This message and any attached documents contain information of MarkLogic
>> and/or its customers that may be confidential and/or privileged. If you are
>> not the intended recipient, you may not read, copy, distribute, or use this
>> information. If you have received this transmission in error, please notify
>> the sender immediately by reply e-mail and then delete this message. This
>> email may contain pricing or other suggested contract terms related to
>> MarkLogic software or services. Any such terms are not binding on MarkLogic
>> unless and until they are included in a definitive agreement executed by
>> MarkLogic.
>>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-19 Thread Jungtaek Lim
I have no context on ML, but your "streaming" query exposes the possibility
of memory issues.

*flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
Since this is a streaming query, grouped aggregation incurs state store,
and since you use the output mode as complete, state store will grow over
time which will dominate the memory in executors.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time


On Tue, Apr 19, 2022 at 4:07 AM Bjørn Jørgensen 
wrote:

> When did SpaCy have support for Spark?
>
> Try Spark NLP  it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :
>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>> Hi Team,
>>> 
>>>
>>> I'm developing a project that retrieves tweets on a 'host' app, streams
>>> them to Spark and with different operations with DataFrames obtains the
>>> Sentiment of the tweets and their entities applying a Sentiment model and a
>>> NER model respectively.
>>>
>>> The problem I've come across is that when applying the NER model, the
>>> RAM consumption increases until the program stops with a memory error
>>> because there's no memory left to execute. In addition, on SparkUI I've
>>> seen that there's only one executor running, the executor driver, but using
>>> htop on the terminal I see that the 8 cores of the instance are executing
>>> at 100%.
>>>
>>> The SparkSession is only configured to receive the tweets from the
>>> socket that connects with the second program that sends the tweets. The
>>> DataFrame goes through some processing to obtain other properties of the
>>> tweet like its sentiment (which causes no error even with less than 8GB of
>>> RAM) and then the NER is applied.
>>>
>>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>>> **"localhost"**).option(**"port"**,**9008**).load()
>>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>>
>>> **#prior processing of the tweets**
>>> sentDF = other_processing(tweets)
>>>
>>> **#obtaining the column that contains the list of entities from a tweet**
>>> nerDF = ner_classification(sentDF)*
>>>
>>>
>>> This is the code of the functions related to obtaining the NER, the
>>> "main call" and the UDF function.
>>>
>>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>>
>>> **#main call, applies the UDF function to every tweet from the "tweet" 
>>> column**def* *ner_classification**(**words**):
>>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>> **return** words
>>>
>>> **#udf function**def* *obtain_ner_udf**(**words**):
>>> **#if the tweet is empty return None*
>>> *if** words == **""**:
>>> **return* *None*
>>> *#else: applying the NER model (Spacy en_core_web_sm)**
>>> entities = nerModel(words)
>>>
>>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>>> entities.ents ]*
>>>
>>>
>>>
>>> And lastly I map each entity with the sentiment from its tweet and
>>> obtain the average sentiment of the entity and the number of appearances.
>>>
>>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>>> flattenedNER.registerTempTable(**"df"**)
>>>
>>>
>>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>>> count(col) as count FROM df GROUP BY col"**
>>> finalDF = spark.sql(querySelect)
>>>
>>> query = 
>>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>>
>>>
>>> The resulting DF is processed with a function that separates each column
>>> in a list and prints it.
>>>
>>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>>> *df**.**select**(**"entity"**).**collect**()]*
>>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>>> *df**.**select**(**

Re: DataStreamReader cleanSource option

2022-02-03 Thread Jungtaek Lim
Hi,

Could you please set the config
"spark.sql.streaming.fileSource.cleaner.numThreads"
to 0 and see whether it works? (NOTE: will slow down your process since the
cleaning phase will happen in the foreground. The default is background
with 1 thread. You can try out more threads than 1.)
If it doesn't help, please turn on the DEBUG log level for the package
"org.apache.spark.sql.execution.streaming"
and grep the log messages from SourceFileArchiver & SourceFileRemover.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jan 27, 2022 at 9:56 PM Gabriela Dvořáková
 wrote:

> Hi,
>
> I am writing to ask for advice regarding the cleanSource option of the
> DataStreamReader. I am using pyspark with Spark 3.1. via Azure Synapse. To
> my knowledge, cleanSource option was introduced in Spark version 3. I'd
> spent a significant amount of time trying to configure this option with
> both "archive" and "delete" options, but the streaming seems to only
> process data in the source data lake storage account container, and store
> them in the sink storage account data lake container. The archive folder is
> never created nor any of the source processed files are removed. None of
> the forums or stackoverflow have been of any help so far, so I am reaching
> out to you if you perhaps have any tips on how to get it running? Here is
> my code:
>
> Reading:
> df = (spark
> .readStream
> .option("sourceArchiveDir", f
> 'abfss://{TRANSIENT_DATA_LAKE_CONTAINER_NAME}@{DATA_LAKE_ACCOUNT_NAME}.
> dfs.core.windows.net/budget-app/budgetOutput/archived-v5')
> .option("cleanSource", "archive")
> .format('json')
> .schema(schema)
> .load(TRANSIENT_DATA_LAKE_PATH))
> --
>
> ...Processing...
>
> Writing:
> (
> df.writeStream
> .format("delta")
> .outputMode('append')
> .option("checkpointLocation", RAW_DATA_LAKE_CHECKPOINT_PATH)
> .trigger(once=True)
> .partitionBy("Year", "Month", "clientId")
> .start(RAW_DATA_LAKE_PATH)
> .awaitTermination()
> )
>
> Thank you very much for help,
> Gabriela
>
> _
>
> Med venlig hilsen / Best regards
>
> Gabriela Dvořáková
>
> Developer | monthio
>
> M: +421902480757
>
> E: gabri...@monthio.com
>
> W: www.monthio.com
>
> Monthio Aps, Ragnagade 7, 2100 Copenhagen
>
>
> Create personal wealth and healthy economy
>
> for people by changing the ways of banking"
>
>


Re: [ANNOUNCE] Apache Spark 3.2.0

2021-10-19 Thread Jungtaek Lim
Thanks to Gengliang for driving this huge release!

On Wed, Oct 20, 2021 at 1:50 AM Dongjoon Hyun 
wrote:

> Thank you so much, Gengliang and all!
>
> Dongjoon.
>
> On Tue, Oct 19, 2021 at 8:48 AM Xiao Li  wrote:
>
>> Thank you, Gengliang!
>>
>> Congrats to our community and all the contributors!
>>
>> Xiao
>>
>> Henrik Peng  于2021年10月19日周二 上午8:26写道:
>>
>>> Congrats and thanks!
>>>
>>>
>>> Gengliang Wang 于2021年10月19日 周二下午10:16写道:
>>>
 Hi all,

 Apache Spark 3.2.0 is the third release of the 3.x line. With
 tremendous contribution from the open-source community, this release
 managed to resolve in excess of 1,700 Jira tickets.

 We'd like to thank our contributors and users for their contributions
 and early feedback to this release. This release would not have been
 possible without you.

 To download Spark 3.2.0, head over to the download page:
 https://spark.apache.org/downloads.html

 To view the release notes:
 https://spark.apache.org/releases/spark-release-3-2-0.html

>>>


Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-06 Thread Jungtaek Lim
I'd recommend getting in touch with Delta Lake community (Google Groups)
https://groups.google.com/forum/#!forum/delta-users to get more feedback
from experts about Delta Lake specific issues.



On Mon, Sep 6, 2021 at 1:56 AM  wrote:

> Hi Jungtaek,
>  thanks for your reply. I was afraid that the problem is not only on my
> side but rather of conceptual nature. I guess I have to rethink my
> approach. However, because you mentioned DeltaLake. I have the same
> problem, but the other way around, with DeltaLake. I cannot write with a
> stream to a DeltaLake created from a static dataframe.
>
> Anyhow, best regards
>   Eugen
>
> On Fri, 2021-09-03 at 11:44 +0900, Jungtaek Lim wrote:
>
> Hi,
>
> The file stream sink maintains the metadata in the output directory. The
> metadata retains the list of files written by the streaming query, and
> Spark reads the metadata on listing the files to read.
>
> This is to guarantee end-to-end exactly once on writing files in the
> streaming query. There could be failure on the streaming query and some
> files may be partially written. Metadata will help to skip reading these
> files and only read files which are correctly written.
>
> This leads to a major restriction, you can't write the output directory
> from multiple queries. For your case, Spark will only read the files which
> are written from the streaming query.
>
> There are 3rd party projects dealing with transactional write from
> multiple writes, (alphabetically) Apache Iceberg, Delta Lake, and so on.
> You may want to check them out.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Sep 2, 2021 at 10:04 PM  wrote:
>
> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>
>
>


Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread Jungtaek Lim
Hi,

The file stream sink maintains the metadata in the output directory. The
metadata retains the list of files written by the streaming query, and
Spark reads the metadata on listing the files to read.

This is to guarantee end-to-end exactly once on writing files in the
streaming query. There could be failure on the streaming query and some
files may be partially written. Metadata will help to skip reading these
files and only read files which are correctly written.

This leads to a major restriction, you can't write the output directory
from multiple queries. For your case, Spark will only read the files which
are written from the streaming query.

There are 3rd party projects dealing with transactional write from multiple
writes, (alphabetically) Apache Iceberg, Delta Lake, and so on. You may
want to check them out.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Sep 2, 2021 at 10:04 PM  wrote:

> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>


Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Jungtaek Lim
Theoretically, the composed value of batchId +
monotonically_increasing_id() would achieve the goal. The major downside is
that you'll need to deal with "deduplication" of output based on batchID
as monotonically_increasing_id() is indeterministic. You need to ensure
there's NO overlap on output against multiple reattempts for the same batch
ID.

Btw, even just assume you dealt with auto increasing ID on write, how do
you read files and apply range pruning by auto increasing ID? Is the
approach scalable and efficient? You probably couldn't avoid reading
unnecessary files unless you build an explicit metadata regarding files
like the map file name to the range of ID and also craft a custom reader to
leverage the information.


On Wed, Jul 14, 2021 at 6:00 AM Sebastian Piu 
wrote:

> If you want them to survive across jobs you can use snowflake IDs or
> similar ideas depending on your use case
>
> On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
> wrote:
>
>> Meaning as a monolithically incrementing ID as in Oracle sequence for
>> each record read from Kafka. adding that to your dataframe?
>>
>> If you do Structured Structured Streaming in microbatch mode, you will
>> get what is known as BatchId
>>
>>result = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.ticker").alias("ticker") \
>>, col("parsed_value.timeissued").alias("timeissued") \
>>, col("parsed_value.price").alias("price")). \
>>  writeStream. \
>>  outputMode('append'). \
>>  option("truncate", "false"). \
>>  *foreachBatch(sendToSink). \*
>>  trigger(processingTime='30 seconds'). \
>>  option('checkpointLocation', checkpoint_path). \
>>  queryName(config['MDVariables']['topic']). \
>>
>> That function sendToSink will introduce two variables df and batchId
>>
>> def *sendToSink(df, batchId):*
>> if(len(df.take(1))) > 0:
>> print(f"""md batchId is {batchId}""")
>> df.show(100,False)
>> df. persist()
>> # write to BigQuery batch table
>> s.writeTableToBQ(df, "append",
>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>> df.unpersist()
>> print(f"""wrote to DB""")
>> else:
>> print("DataFrame md is empty")
>>
>> That value batchId can be used for each Batch.
>>
>>
>> Otherwise you can do this
>>
>>
>> startval = 1
>> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
>> felixkizhakkelj...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>>> I am wondering if its possible for me to introduce a uniquely incrementing
>>> identifier for each record as we do in RDBMS (incrementing long id)?
>>> This would greatly benefit to range prune while reading based on this ID.
>>>
>>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>>> like its not deterministic and it wont ensure new records gets next id from
>>> the latest id what  is already present in the storage (S3)
>>>
>>> Regards,
>>> Felix K Jose
>>>
>>


Re: ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception

2021-03-18 Thread Jungtaek Lim
We've fixed the single case for "onJobStart", please check SPARK-34731 [1].
The patch will be available in Spark 3.1.2 / 3.2.0, but if someone reports
the same for lower version lines I think we could port back to lower
version lines as well.

1. https://issues.apache.org/jira/browse/SPARK-34731

On Fri, Mar 19, 2021 at 5:00 AM Mich Talebzadeh 
wrote:

>
> Recall this was the error
>
> 21/03/18 16:53:38 ERROR org.apache.spark.scheduler.AsyncEventQueue:
> Listener EventLoggingListener threw an exception
>
> java.util.ConcurrentModificationException
>
> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
>
> I resolved this error message by setting:
>
> spark.conf.set("spark.eventLog.enabled", "false")
>
> This happens when Spark writes to Google BigQuery table. The error is not
> a show stopper
>
> There is also a recent reference to event logging listener exception in
> this jira
>
> Getting event logging listener exception · Issue #439 · delta-io/delta
> (github.com) 
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 18 Mar 2021 at 17:39, Mich Talebzadeh 
> wrote:
>
>> This is an intermittent error. Full error is this
>>
>> 21/03/18 17:35:12 ERROR org.apache.spark.scheduler.AsyncEventQueue:
>> Listener EventLoggingListener threw an exception
>> java.util.ConcurrentModificationException
>> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
>> at
>> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
>> at
>> scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
>> at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
>> at scala.collection.mutable.AbstractMap.toSeq(Map.scala:84)
>> at
>> org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
>> at
>> org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
>> at
>> org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
>> at
>> org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
>> at
>> org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
>> at
>> org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
>> at
>> scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
>> at org.apache.spark.scheduler.AsyncEventQueue.org
>> $apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
>> at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1381)
>> at
>> org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
>>
>> There is a reference to this error here.
>>
>>
>> [SPARK-32027] EventLoggingListener threw
>> java.util.ConcurrentModificationException - ASF JIRA (apache.org)
>> 
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruct

Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Jungtaek Lim
Hmm... I read the page again, and it looks like we are in a gray area.

Hadoop community supports JDK 11 starting from Hadoop 3.3, while we haven't
reached adding Hadoop 3.3 as a dependency. It may not make a real issue on
runtime with Hadoop 3.x as Spark is using a part of Hadoop (client layer),
but worth to know in any way that it's not in official support from the
Hadoop community.

On Wed, Mar 17, 2021 at 6:54 AM Jungtaek Lim 
wrote:

> Hadoop 2.x doesn't support JDK 11. See Hadoop Java version compatibility
> with JDK:
>
> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Java+Versions
>
> That said, you'll need to use Spark 3.x with Hadoop 3.1 profile to make
> Spark work with JDK 11.
>
> On Tue, Mar 16, 2021 at 10:06 PM Sean Owen  wrote:
>
>> That looks like you didn't compile with Java 11 actually. How did you try
>> to do so?
>>
>> On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja <
>> kaki.mahesh_r...@nokia.com> wrote:
>>
>>> HI All,
>>>
>>> We have compiled spark with java 11 ("11.0.9.1") and when testing the
>>> thrift
>>> server we are seeing that insert query from operator using beeline
>>> failing
>>> with the below error.
>>>
>>> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
>>> "timezone":"UTC", "log":"Uncaught exception in thread
>>> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
>>> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
>>> DatanodeInfoWithStorage[10.75.47.158:1044
>>> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
>>> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
>>> sun.nio.ch.DirectBuffer.cleaner()'
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at java.io.FilterInputStream.close(FilterInputStream.java:180)
>>> ~[?:?]
>>> at
>>> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> at
>>> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
>>> "timezone":"UTC", "log":"unwrapping token of length:54"}
>>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
>>> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
>>> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>>>
>>> Any inputs on how to fix this issue would be helpful for us.
>>>
>>> Thanks and Regards,
>>> kaki mahesh raja
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Jungtaek Lim
Hadoop 2.x doesn't support JDK 11. See Hadoop Java version compatibility
with JDK:

https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Java+Versions

That said, you'll need to use Spark 3.x with Hadoop 3.1 profile to make
Spark work with JDK 11.

On Tue, Mar 16, 2021 at 10:06 PM Sean Owen  wrote:

> That looks like you didn't compile with Java 11 actually. How did you try
> to do so?
>
> On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja 
> wrote:
>
>> HI All,
>>
>> We have compiled spark with java 11 ("11.0.9.1") and when testing the
>> thrift
>> server we are seeing that insert query from operator using beeline
>> failing
>> with the below error.
>>
>> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
>> "timezone":"UTC", "log":"Uncaught exception in thread
>> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
>> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
>> DatanodeInfoWithStorage[10.75.47.158:1044
>> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
>> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
>> sun.nio.ch.DirectBuffer.cleaner()'
>> at
>>
>> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
>> ~[hadoop-common-2.10.1.jar:?]
>> at
>>
>> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
>> ~[hadoop-common-2.10.1.jar:?]
>> at
>>
>> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
>> ~[hadoop-common-2.10.1.jar:?]
>> at java.io.FilterInputStream.close(FilterInputStream.java:180)
>> ~[?:?]
>> at
>> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> at
>> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
>> "timezone":"UTC", "log":"unwrapping token of length:54"}
>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
>> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
>> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>>
>> Any inputs on how to fix this issue would be helpful for us.
>>
>> Thanks and Regards,
>> kaki mahesh raja
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Structured Streaming and Kafka message schema evolution

2021-03-15 Thread Jungtaek Lim
If I understand correctly, SQL semantics are strict on column schema.
Reading via Kafka data source doesn't require you to specify the schema as
it provides the key and value as binary, but once you deserialize them,
unless you keep the type as primitive (e.g. String), you'll need to specify
the schema, like from_json requires you to.

This wouldn't be changed even if you leverage Schema Registry - you'll need
to provide the schema which is compatible with all schemas which records
are associated with. I guess that's guaranteed if you use the latest
version of the schema and you've changed the schema as "backward-compatible
ways". I admit I haven't dealt with SR in SSS, but if you integrate the
schema to the query plan, running query is unlikely getting the latest
schema, but it still wouldn't matter as your query should only leverage the
part of schema you've integrated, and the latest schema is "backward
compatible" with the integrated schema.

Hope this helps.

Thanks
Jungtaek Lim (HeartSaVioR)

On Mon, Mar 15, 2021 at 9:25 PM Mich Talebzadeh 
wrote:

> This is just a query.
>
> In general Kafka-connect requires means to register that schema such that
> producers and consumers understand that. It also allows schema evolution,
> i.e. changes to metadata that identifies the structure of data sent via
> topic.
>
> When we stream a kafka topic into (Spark Structured Streaming (SSS), the
> assumption is that by the time Spark processes that data, its structure
> can be established. With foreachBatch, we create a dataframe on top of
> incoming batches of Json messages and the dataframe can be interrogated.
> However, the processing may fail if another column is added to the topic
> and the consumer (in this case SSS) is not aware of it. How can this change
> of schema be verified?
>
> Thanks
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Using Spark as a fail-over platform for Java app

2021-03-12 Thread Jungtaek Lim
That's what resource managers provide to you. So you can code and deal with
resource managers, but I assume you're finding ways to not deal with
resource managers directly and let Spark do it instead.

I admit I have no experience (I did the similar with Apache Storm on
standalone setup 5+ years ago), but the question can be simply changed as
"making driver fault-tolerant" as your app logic can run under driver even
if you don't do any calculation with Spark. And there seems to be lots of
answers in google for the new question, including the old one;
https://stackoverflow.com/questions/26618464/what-happens-if-the-driver-program-crashes


On Sat, Mar 13, 2021 at 5:21 AM Lalwani, Jayesh 
wrote:

> Can I cut a steak with a hammer? Sure you can, but the steak would taste
> awful
>
>
>
> Do you have organizational/bureaucratic issues with using a Load Balancer?
> Because that’s what you really need. Run your application on multiple nodes
> with a load balancer in front. When a node crashes, the load balancer will
> shift the traffic to the healthy node until the crashed node recovers.
>
>
>
> *From: *Sergey Oboguev 
> *Date: *Friday, March 12, 2021 at 2:53 PM
> *To: *User 
> *Subject: *[EXTERNAL] Using Spark as a fail-over platform for Java app
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I have an existing plain-Java (non-Spark) application that needs to run in
> a fault-tolerant way, i.e. if the node crashes then the application is
> restarted on another node, and if the application crashes because of
> internal fault, the application is restarted too.
>
> Normally I would run it in a Kubernetes, but in this specific case
> Kubernetes is unavailable because of organizational/bureaucratic issues,
> and the only execution platform available in the domain is Spark.
>
> Is it possible to wrap the application into a Spark-based launcher that
> will take care of executing the application and restarts?
>
> Execution must be in a separate JVM, apart from other apps.
>
> And for optimum performance, the application also needs to be assigned
> guaranteed resources, i.e. the number of cores and amount of RAM required
> for it, so it would be great if the launcher could take care of this too.
>
> Thanks for advice.
>


Re: Detecting latecomer events in Spark structured streaming

2021-03-11 Thread Jungtaek Lim
Hi,

If I remember correctly, I don't think Spark provides watermark value
itself for the current batch to the public API. That said, if you're
dealing with "event time" (and I guess you belong to this case as you worry
about late events), unless you employ a new logical/physical plan to expose
watermarks to the user level function, it's not possible to do what you
plan to do.

I've tried similar thing to count the number of late events via making
changes on Spark codebase (see https://github.com/apache/spark/pull/24936)
- my initial goal was providing side-output on late events to let end users
being able to deal with these events outside of the query, but soon
realized it's non-trivial, and just took the simplest approach at that time.
(There're still possible ideas to do, e.g. sending them to the driver via
RPC, assuming these events are "minority", but nothing comes into
conclusion it worths to put efforts. If your business logic requires it,
you could be a hacker and try to deal with this, and share if you succeed
to make it.)

I'd skip answering questions as I explained you'd be stuck even before
raising these questions.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, Mar 9, 2021 at 6:49 AM Sergey Oboguev  wrote:

> I have a Spark structured streaming based application that performs
> window(...) construct followed by aggregation.
>
> This construct discards latecomer events that arrive past the watermark. I
> need to be able to detect these late events to handle them out-of-band.
> The application maintains a raw store of all received events and can
> re-aggregate a particular time interval for a particular device in a
> secondary batch mode, as long as it knows that this interval has to be
> re-aggregated, i.e. contains latecomer data that was discarded by
> structured streaming due to watermark.
>
> I am trying to come with a way to perform such a detection.
>
> One approach would perhaps be to insert an additional stage before
> window(...) -- a stage that would monitor all events received by the
> stream, look at their timestamps, and predict which events will be
> discarded by window(...) due to watermark. Such events can then be handled
> outside of Spark structured streaming. The stage can be based on
> Dataset.foreach, Dataset.filter or Dataset.map that will pass all events
> through, but also monitor the events and if a latecomer condition is
> detected, then issue a side channel notification that will cause data for
> the specified device and interval be re-aggregated later from raw event
> storage, out of stream.
>
> I have a couple of questions related to the feasibility of such a
> construct.
>
> Q1:
>
> Can data behind the window(...) be shared by multiple executors or nodes,
> or is it owned by one executor at any given time? If it is shared, it would
> appear that local monitoring of passing timestamps would be insufficient,
> since it lacks global context.
>
> Q2:
>
> To monitor the stream, the stage needs to maintain a context. The context
> can be checkpointed periodically in an external store, but I do not want to
> persist/readback the context for every microbatch (or, in the foreach case,
> for every individual event). I want to checkpoint the context infrequently,
> and maintain it across microbatches just in memory.
>
> Which brings a question... The handler function inside the stage (called
> by foreach, map, or filter) needs to refer to the context object, yet it is
> unclear how to make such a reference.
>
> I could attach a context to the stream via some global map object
> (translating stream->context), but handler functions for Dataset.foreach,
> Dataset.map, or Dataset.filter do not receive a stream handle, and thus
> have no key to use for translation back to context object.
>
> The association cannot be done via a TLS (per-thread) variable too, since
> Spark can internally create threads for stream processing and they won't
> inherit the parent TLS (and also may not even have the thread that started
> the stream as their parent thread).
>
> This appears to leave Java static variable as the only option for the
> context pointer, limiting the model to one active stream per executor. But
> is it guaranteed by Spark specification that different executors will run
> in different JVM instances?
>
> Thanks for advice.
>


Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.

2021-03-11 Thread Jungtaek Lim
Hi,

Could you please provide the Spark version?

Also it would be pretty much helpful if you could provide a simple
reproducer, like placing your reproducer which can simply be built (mvn or
gradle or sbt) into your Github repository, plus the set of input data to
see the behavior. Worth to know that others aren't interested in your own
code even if they are interested in the problematic behavior itself. It'd
be nice if you can minimize the hurdle on debugging.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin  wrote:

> Hello,
>
> I have a use case where I need to read events(non correlated) from a
> source kafka topic, then correlate and push forward to another target topic.
>
> I use spark structured streaming with FlatMapGroupsWithStateFunction along
> with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I
> apply some correlation logic on group events and push forward correlated
> events to another topic(via ForEachBatch). Non correlated events are stored
> in the state until they are correlated in a future set of events.
>
> With this scenario, when I push a single event to source topic, I see it
> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
> but only once in ForEachBatch processor(which is good).
>
> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as
> it causes issues with my correlation logic.
>
> Can someone help me to understand why this is seen thrice
> in FlatMapGroupsWithStateFunction?.
>
> Code snippets are shown below. Please let me know what is missing and how
> can i solve this,
>
> thanks,
> Robin Kuttaiah
>
> *StreamQuery*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Dataset sessionUpdates = null;
> FlatMapGroupsWithStateFunction MilestoneEvent> idstateUpdateFunction = new
> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try {
>   sessionUpdates = idFedKafkaEvents  .groupByKey(  new
> MapFunction() {private static final long
> serialVersionUID = -797571731893988577L;@Override public
> String call(Row event) {  return
> event.getAs("EVENT_MODEL_ID_COL");}  },
> Encoders.STRING())
> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>   Encoders.bean(IdentifierConnector.class),
> Encoders.bean(MilestoneEvent.class),
> GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception
> oException) {  //log and throw back exception*
>
>
>
>
>
>
>
>
> *}ForeachBatchProcessor oForeachBatch = new
> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
> m_strQueryName);DataStreamWriter events =
> sessionUpdates.writeStream().queryName(queryName)
> .outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds"
> *))*
>
>
> *.option("checkpointLocation", checkpointLocation)
> .foreachBatch(oForeachBatch);*
>
>
> *FlatMapGroupsWithStateFunction:*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class FlatMapIdFedGroupFunction implements
> FlatMapGroupsWithStateFunction MilestoneEvent> {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
> InsightDeployment iDeployment) {  }  @Override  public
> Iterator call(String key, Iterator events,
> GroupState state)  throws Exception {
> List outputEvents = new ArrayList();
> IdentifierConnector session = null;
> IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"
>  "+System.currentTimeMillis()); //Called thriceif (!state.exists() ) {
> session = new IdentifierConnector();}  else {  session =
> state.get();}while (events.hasNext()) {  Row event =
> events.next();  MilestoneEvent mEventCurr =
> IdFederationUtil.getMilestoneEvent(event, insightEvent);
> outputEvents.add(mEventCurr);
> IdFederationUtil.write(".."+mEventCurr.getMilestoneId()); //Called
> thrice  break;}return outputEvents.iterator();  }*
>
> *}*
>
>
> *ForEachBatchFunction:*
>
> public class ForeachBatchProcessor implements
> VoidFunction2, Long>, Serializable {
>
>   private static final long serialVersionUID = 1L;
>
>   public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
>   InsightEvent in_oInsightEvent, String in_strQueryName) {
> \  }
>
>   public void call(Dataset in_Rows, Long in_lBatchID)
>   throws Exception {
> if (in_Rows.count() == 0L) {
>   return;
> }
> IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+
> in_Rows.count());
> List events = in_Rows.collectAsList();
> for(MilestoneEvent m: events) {
>   IdFederationUtil.write("..BATCH "+m.getMilestoneId());
> }
>   }
>
> }
>
>
>


Re: Spark job crashing - Spark Structured Streaming with Kafka

2021-03-03 Thread Jungtaek Lim
01)
> at
> okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 21/03/02 07:00:35 WARN WatchConnectionManager: Exec Failure
> java.io.EOFException
> at okio.RealBufferedSource.require(RealBufferedSource.java:61)
> at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
> at
> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
> at
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
> at
> okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 21/03/02 07:05:36 WARN WatchConnectionManager: Exec Failure
> java.io.EOFException
> at okio.RealBufferedSource.require(RealBufferedSource.java:61)
> at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
> at
> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
> at
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
> at
> okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 21/03/02 07:10:37 WARN WatchConnectionManager: Exec Failure
> java.io.EOFException
> at okio.RealBufferedSource.require(RealBufferedSource.java:61)
> at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
> at
> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
> at
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
> at
> okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
> at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> xecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
>  [2021-03-02 07:25:35,740] -  - [ERROR] - StreamingQueryException
> Exception while calling run_data Traceback (most recent call last):
>   File "/opt/app/file.py", line 69, in process_streams
> query.awaitTermination()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line
> 103, in awaitTermination
> return self._jsq.awaitTermination()
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
> return_value = get_return_value(
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134,
> in deco
> raise_from(converted)
>   File "", line 3, in raise_from
> pyspark.sql.utils.StreamingQueryException: Writing job aborted.
> === Streaming Query ===
> Identifier: [id = 4ee71252-c480-4d00-866b-0fbd88e9520e, runId =
> 8f1f1756-da8d-4983-9f76-dc1af626ad84]
> Current Committed Offsets: {}
> Current Available Offsets: {KafkaV2[Subscribe[test-topic]]:
> {"test-topic":{"0":4628}}}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> WriteToMicroBatchDataSource
> org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$anon$1$

Re: [ANNOUNCE] Announcing Apache Spark 3.1.1

2021-03-03 Thread Jungtaek Lim
Thanks Hyukjin for driving the huge release, and thanks everyone for
contributing the release!

On Wed, Mar 3, 2021 at 6:54 PM angers zhu  wrote:

> Great work, Hyukjin !
>
> Bests,
> Angers
>
> Wenchen Fan  于2021年3月3日周三 下午5:02写道:
>
>> Great work and congrats!
>>
>> On Wed, Mar 3, 2021 at 3:51 PM Kent Yao  wrote:
>>
>>> Congrats, all!
>>>
>>> Bests,
>>> *Kent Yao *
>>> @ Data Science Center, Hangzhou Research Institute, NetEase Corp.
>>> *a spark enthusiast*
>>> *kyuubi is a
>>> unified multi-tenant JDBC interface for large-scale data processing and
>>> analytics, built on top of Apache Spark .*
>>> *spark-authorizer A Spark
>>> SQL extension which provides SQL Standard Authorization for **Apache
>>> Spark .*
>>> *spark-postgres  A library
>>> for reading data from and transferring data to Postgres / Greenplum with
>>> Spark SQL and DataFrames, 10~100x faster.*
>>> *spark-func-extras A
>>> library that brings excellent and useful functions from various modern
>>> database management systems to Apache Spark .*
>>>
>>>
>>>
>>> On 03/3/2021 15:11,Takeshi Yamamuro
>>>  wrote:
>>>
>>> Great work and Congrats, all!
>>>
>>> Bests,
>>> Takeshi
>>>
>>> On Wed, Mar 3, 2021 at 2:18 PM Mridul Muralidharan 
>>> wrote:
>>>

 Thanks Hyukjin and congratulations everyone on the release !

 Regards,
 Mridul

 On Tue, Mar 2, 2021 at 8:54 PM Yuming Wang  wrote:

> Great work, Hyukjin!
>
> On Wed, Mar 3, 2021 at 9:50 AM Hyukjin Kwon 
> wrote:
>
>> We are excited to announce Spark 3.1.1 today.
>>
>> Apache Spark 3.1.1 is the second release of the 3.x line. This
>> release adds
>> Python type annotations and Python dependency management support as
>> part of Project Zen.
>> Other major updates include improved ANSI SQL compliance support,
>> history server support
>> in structured streaming, the general availability (GA) of Kubernetes
>> and node decommissioning
>> in Kubernetes and Standalone. In addition, this release continues to
>> focus on usability, stability,
>> and polish while resolving around 1500 tickets.
>>
>> We'd like to thank our contributors and users for their contributions
>> and early feedback to
>> this release. This release would not have been possible without you.
>>
>> To download Spark 3.1.1, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-1-1.html
>>
>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>>


Re: Spark job crashing - Spark Structured Streaming with Kafka

2021-03-02 Thread Jungtaek Lim
I feel this quite lacks information. Full stack traces from
driver/executors are essential at least to determine what was happening.

On Tue, Mar 2, 2021 at 5:26 PM Sachit Murarka 
wrote:

> Hi All,
>
> My spark job is crashing (Structured stream) . Can anyone help please. I
> am using spark 3.0.1 with kubernetes.
>
> [ERROR] - StreamingQueryException Exception in  query.awaitTermination()
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line
> 103, in awaitTermination
> return self._jsq.awaitTermination()
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
> return_value = get_return_value(
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134,
> in deco
> raise_from(converted)
>
> Kind Regards,
> Sachit Murarka
>


Re: Spark 2.3 Stream-Stream Join with left outer join lost left stream value

2021-02-27 Thread Jungtaek Lim
We figured out edge-case from stream-stream left/right outer join in Spark
2.x and fixed in Spark 3.0.0. Please refer SPARK-26154
 for more details.
The fix brought another regression which was fixed in 3.0.1, so you may
want to move to Spark 3.0.1+ to fix the issue.

The state format was changed so the fix is applied only when you start from
scratch (no restore from checkpoint). Unfortunately there's no way to
migrate the old state format to the new state format.

Hope this helps.

On Sat, Feb 27, 2021 at 10:24 PM Xu Yan  wrote:

> I'm trying to implement a stream-stream join toy with Spark 2.3.0
>
> The stream joins work fine when the condition matches, but lost the left
> stream value when the condition mismatched even using leftOuterJoin.
>
> Thanks in advance
>
> Here are my source code and data, basically, I'm creating two sockets, one
> is  as right stream source and 9998 as left stream source.
>
> val spark = SparkSession
>   .builder
>   .appName("StreamStream")
>   .master("local")
>   .getOrCreate()
>
> import spark.implicits._
>
> spark.sparkContext.setLogLevel("ERROR")
>
> val s: DataFrame = spark
>   .readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val sDataset: Dataset[S] = s
>   .map(line => {
> val strings = line.get(0).toString.split(",")
> val id = strings(0).toInt
> val time = Timestamp.valueOf(strings(1))
> S(id, time)
>   })
>   .withWatermark("timestamp99", "30 seconds")
>
> val s9998Dataset: Dataset[S9998] = spark
>   .readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", 9998)
>   .load()
>   .map(line => {
> val strings = line.get(0).toString.split(",")
> val id = strings(0).toInt
> val time = Timestamp.valueOf(strings(1))
> S9998(id, time)
>   })
>
> val resultDataset = s9998Dataset
>   .join(sDataset,
> joinExprs = expr(
>   """
> id99 = id98 AND
> timestamp98 >= timestamp99 AND
> timestamp98 <= timestamp99 + interval 6 seconds
> """),
> joinType = "leftOuter")
>
> val streamingQuery = resultDataset
>   .writeStream
>   .outputMode("append")
>   .format("console")
>   .start()
>
> streamingQuery.awaitTermination()
>   }
>
>   case class S(id99: Int, timestamp99: Timestamp)
>
>   case class S9998(id98: Int, timestamp98: Timestamp)
>
>
>
> Sample Data:
> 1,2011-10-02 18:50:20.123
> 2,2011-10-02 18:50:25.123
> 3,2011-10-02 18:50:30.123
> 4,2011-10-02 18:50:35.123
> 5,2011-10-02 18:50:40.123
> 6,2011-10-02 18:50:45.123
> 7,2011-10-02 18:50:50.123
> 8,2011-10-02 18:50:55.123
> 9,2011-10-02 18:51:00.123
> 10,2011-10-02 18:51:05.123
> 11,2011-10-02 18:51:10.123
> 12,2011-10-02 18:51:15.123
> 13,2011-10-02 18:51:20.123
> 14,2011-10-02 18:51:25.123
> 15,2011-10-02 18:51:30.123
> 16,2011-10-02 18:52:30.123
>
>
>


Re: Structured streaming, Writing Kafka topic to BigQuery table, throws error

2021-02-23 Thread Jungtaek Lim
If your code doesn't require "end to end exactly-once" then you could
leverage foreachBatch which enables you to use batch sink.

If your code requires "end to end exactly-once", then well, that's the
different story. I'm not familiar with BigQuery and even have no idea how
sink is implemented, but from quick googling tells me a transaction with
multiple DML isn't supported, so end to end exactly-once cannot be
implemented in any way.

If you ensure the write in the query is idempotent then no matter at all.

On Tue, Feb 23, 2021 at 10:35 PM Mich Talebzadeh 
wrote:

> With the ols spark streaming (example in Scala), this would have been
> easier through RDD. You could read data
>
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](streamingContext, kafkaParams, topicsValue)
>
> dstream.foreachRDD
>
> { pricesRDD =>
>
>   if (!pricesRDD.isEmpty)  // data exists in RDD
>
>   {
>
>  write to DB
>   }
>
>
> Now with structured streaming in Python, you read data into a dataframe
> with reaSstream and load
>
>
>schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
>
> ds = self.spark \
>
> .readStream \
>
> .format("kafka") \
>
>  ...
>
>   .load() \
>
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>ds2 = ds \
>
> .select( \
>
>  col("parsed_value.rowkey").alias("rowkey") \
>
>, col("parsed_value.ticker").alias("ticker") \
>
>, col("parsed_value.timeissued").alias("timeissued") \
>
>, col("parsed_value.price").alias("price")). \
>
>  withColumn("currency",
> lit(config['MDVariables']['currency'])). \
>
>  withColumn("op_type",
> lit(config['MDVariables']['op_type'])). \
>
>  withColumn("op_time", current_timestamp())
>
> # write to console
>
>   query = ds2. \
> writeStream. \
> outputMode("append"). \
> format("console"). \
> start()
> ds2.printSchema()
>
>
> But writing to BigQuery through BigQuery API does not work
>
>
>  s.writeTableToBQ(ds2, "overwrite",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
>
>  query.awaitTermination()
>
>
> So this is the run result and the error
>
>
> root
>
>  |-- rowkey: string (nullable = true)
>
>  |-- ticker: string (nullable = true)
>
>  |-- timeissued: timestamp (nullable = true)
>
>  |-- price: float (nullable = true)
>
>  |-- currency: string (nullable = false)
>
>  |-- op_type: string (nullable = false)
>
>  |-- op_time: timestamp (nullable = false)
>
>
> *'write' can not be called on streaming Dataset/DataFrame;, quitting*
>
> I gather need to create RDD from the dataframe or maybe there is another
> way to write streaming data to DB directly from the dataframe?
>
> Thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Controlling Spark StateStore retention

2021-02-20 Thread Jungtaek Lim
Looks like you're trying to add two stateful operations in a chain -
actually this would trigger the limitation of global watermark and lead the
output "possibly" to be incorrect.
We've documented the limitations in the SS guide doc starting from Spark
3.0, so please take time to read the doc to know what would be possible
issues for this and what workaround is available.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#limitation-of-global-watermark

Starting from Spark 3.0, Spark provides a warning log message when the
pattern is detected. Even in upcoming Spark 3.1, the query having such a
pattern is disallowed unless end users set the config explicitly to force
run.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Sun, Feb 21, 2021 at 8:49 AM Sergey Oboguev  wrote:

> I am trying to write a Spark Structured Streaming application consisting
> of GroupState construct followed by aggregation.
>
> Events arriving from event sources are bucketized by deviceId and
> quantized timestamp, composed together into group state key idTime.
>
> Logical plan consists of stages (in the order of data flow):
>
> FlatMapGroupsWithState
> Aggregate
>
>
> and translates to physical plan (in the same order)
>
> FlatMapGroupsWithState
> SerializeFromObject
> Project (idTime, timestamp)
> HashAggregate(keys=[idTime] ... partial aggregation)
> Exchange hashpartitioning(idTime)
> HashAggregate(keys=[idTime] ... merge aggregates)
> StateStoreRestore [idTime], state info)
> HashAggregate(keys=[idTime] ... merge aggregates)
> StateStoreSave [idTime], state info)
> HashAggregate(keys=[idTime], functions= ...)
>
>
> This all works, but it appears that partial aggregate state does not ever
> get released.
>
> If I send 10 events for some value of idTime, the stream produces an
> output batch with count = 10.
>
> If some significant time later (after group state expires) I send 10 more
> events for the some value of idTime, the stream produces another output
> batch with count = 20. Other aggregates also reflect that both old and new
> events were reflected in this subsequent aggregation output batch.
>
> Thus, it appears state information is not cleared from the state store.
>
> This is nice from the standpoint of handling latecomer events, but also
> poses a problem: if partial aggregate information per every idTime value is
> never cleared from the state store, the state store eventually is going to
> run out of space.
>
> Is there a way to control this retention and trigger the release of state
> store data for old values idTime, no longer needed?
>
> Thanks for advice.
>


Re: [Spark SQL] - Not able to consume Kafka topics

2021-02-18 Thread Jungtaek Lim
(Dropping Kafka user mailing list as this is more likely Spark issue)

Do you have a full stack trace for a log message? It would help to make
clear where the issue lays.

On Thu, Feb 18, 2021 at 8:01 PM Rathore, Yashasvini
 wrote:

> Hello,
>
> Issues :
>
>   *   I and my team are trying to consume some kafka topics based on the
> timestamps using startingOffsetsByTimestamps option, and the code works
> fine when we run via a Databricks notebook.
>   *   There is a need to setup the whole process in a local system
> (IntelliJ), but the same code doesn’t work there. We are referring the
> official documentation page, and using the exact same syntax and the same
> versions as mentioned but somehow the code fails on the
> startingOffsetsByTimestamps line.
>   *   The following versions are being used:
>
>   *   Scala : 2.12.12
>   *   Spark-sql : 3.0.1
>   *   Spark-sql-kafka-0-10
>
>   *   The code snippet is as follows, please suggest any changes or
> details that we can use to fix this :
>
>  val spark =
> SparkSession.builder().appName("Automation").master("local[*]").getOrCreate()
>
> val df = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrap)
>   .option("subscribe", topic)
>   .option("kafka.security.protocol", "SSL")
>   .option("kafka.ssl.truststore.location", truststore_location)
>   .option("kafka.ssl.truststore.password", truststore_pwd)
>   .option("kafka.ssl.keystore.location", keystore_location)
>   .option("kafka.ssl.keystore.password", keystore_pwd)
>   .option("kafka.ssl.key.password", key_pwd)
>   .option("kafka.schema.registry.url", url)
>   .option("kafka.request.timeout.ms", "6")
>   .option("kafka.session.timeout.ms", "6")
>   .option("maxOffsetsPerTrigger", 300)
>   .option("failOnDataLoss", "false")
>   .option("dropMalformed", "true")
>   .option("startingOffsetsByTimestamp", """{​​​"topic":
> {​​​"0": ​1000}​​​}​​​""")
>   .load()
>
>   df.show()
>
> Error produced :
> Exception in thread "main" java.lang.IllegalArgumentException: Expected
> e.g. {"topicA": {"0": 123456789, "1": 123456789},
> "topicB": {"0": 123456789, "1": 123456789}}
>
>
> Expecations :
>
>   *   I believe someone from kafka/spark team would help me resolve this
> issue, so that I can proceed further with my work.
>
>
> --
> Thanks & Regards,
> Yashasvini Rathore
> Assoc Software Engineer II, Hyderabad, India
> (Desk) +91 403/968-5738
> Our United Culture. The way forward.
> ■ Integrity ■ Compassion ■ Relationships ■ Innovation ■ Performance
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: KafkaUtils module not found on spark 3 pyspark

2021-02-17 Thread Jungtaek Lim
I got similar question recently so had to find some history I missed. If I
understand correctly the class is "intentionally" removed in Spark 3,
because the class refers "kafka 0.8" module which isn't guaranteed to work
with recent Kafka version. And looks like there was another decision to not
add pyspark support for "kafka 0.10" module.

Nowadays, you're encouraged to use Structured Streaming instead of DStream
whenever possible, cause community's main focus is on SQL which is what
Structured Streaming is based on. (Few contributors have willingness to
maintain DStream. Honestly, contributions on DStream have been quite rare.)

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, Feb 17, 2021 at 4:19 PM aupres  wrote:

> I use hadoop 3.3.0 and spark 3.0.1-bin-hadoop3.2. And my python ide is
> eclipse version 2020-12. I try to develop python application with
> KafkaUtils
> pyspark module. My configuration reference of pyspark and eclipse is  this
> site
> <
> https://enahwe.wordpress.com/2015/11/25/how-to-configure-eclipse-for-developing-with-python-and-spark-on-hadoop/>
>
> . Simple codes like below work well without exception.
>
>
> from pyspark import SparkContext, SparkConf
>
> conf = SparkConf().setAppName("Kafka2RDD").setMaster("local[*]")
> sc = SparkContext(conf = conf)
> data = [1, 2, 3, 4, 5, 6]
> distData = sc.parallelize(data)
>
> print(distData.count())
>
>
> But I found the spark 3 pyspark module does not contain KafkaUtils at all.
> The below codes can not import KafkaUtils.
>
>
> from pyspark.streaming.kafka import KafkaUtils
> from pyspark.streaming.kafka import OffsetRange
>
>
> So, I downgrade spark from 3.0.1-bin-hadoop3.2 to 2.4.7-bin-hadoop2.7. Then
> I can sucsessfully import KafkaUtils on eclipse ide. But this time the
> exceptions related with spark version are thrown continuously.
>
>
> Traceback (most recent call last):
>   File
>
> "/home/jhwang/eclipse-workspace/BigData_Etl_Python/com/aaa/etl/kafka_spark_rdd.py",
> line 36, in 
> print(distData.count())
>   File "/usr/local/spark/python/pyspark/rdd.py", line 1055, in count
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/usr/local/spark/python/pyspark/rdd.py", line 1046, in sum
> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>   File "/usr/local/spark/python/pyspark/rdd.py", line 917, in fold
> vals = self.mapPartitions(func).collect()
>   File "/usr/local/spark/python/pyspark/rdd.py", line 816, in collect
> sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>   File
> "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1305, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/python/anaconda3/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
> : java.lang.IllegalArgumentException: Unsupported class file major version
> 55
> at org.apache.xbean.asm6.ClassReader.(ClassReader.java:166)
> at org.apache.xbean.asm6.ClassReader.(ClassReader.java:148)
> at org.apache.xbean.asm6.ClassReader.(ClassReader.java:136)
>
>
> How on earth can I import KafkaUtils and related modules on spark 3.0.1.
> Where is KafkaUtils module on pyspark of Spark 3.0.1 or how can the pyspark
> module can be installed? Any reply will be welcome. Best regards.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to handle spark state which is growing too big even with timeout set.

2021-02-14 Thread Jungtaek Lim
For now, you'd like to consider using a 3rd party implementation of RocksDB
state store (either open source implementations, or commercial one if you
use either Databricks or Qubole) if the state doesn't fit the executor
memory.

   - https://github.com/chermenin/spark-states
   - https://github.com/qubole/spark-state-store

Hopefully, Spark community had a discussion on providing RocksDB state
store out of the box and the discussion went positive. Worth noting that
even in a happy case Spark community would introduce it in 3.2.0 which
takes months (release phase on 3.1 is still ongoing, so probably need to
add 6+ months from now). So if you're encountering the problem in
production level or waiting for Spark 3.2 is not an option, you still have
no option but to try out 3rd party implementations.

In the meanwhile I'm planning to look into "state migration" which lets
users migrate their state from state store provider A to B. The hopeful
plan is to support any arbitrary providers between the two.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Feb 11, 2021 at 5:01 PM Kuttaiah Robin  wrote:

> Hello,
>
> I have a use case where I need to read events(non correlated) from a kafka
> topic, then correlate and push forward to another topic.
>
> I use spark structured streaming with FlatMapGroupsWithStateFunction along
> with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I do
> some correlation on group events and push forward correlated events to
> another topic. Non correlated events are stored in the state until they are
> correlated in a future set of events.
>
> There are times where non correlated events are more and the size of non
> correlated events in the state are growing too big.
>
> Does anyone know how to handle this use case or will spark take care of
> handling state when it grows big?
>
> Thanks in advance.
>
> regards,
> Robin Kuttaiah
>


Re: Structured Streaming Spark 3.0.1

2021-01-21 Thread Jungtaek Lim
Looks like it's a driver side error log, and I think executor log would
have much more warning/error logs and probably with stack traces.

I'd also suggest excluding the external dependency whatever possible while
experimenting/investigating. If you're suspecting Apache Spark I'd rather
say you'll want to stick with writing to Kafka on investigation, not
changing to Delta Lake which adds the external dependency and harder to
find where is the root cause.

Your dependencies are a bit odd. Could you please remove dependencies for
spark-sql-kafka and try out "--packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1" in
spark-submit/spark-shell instead?


On Fri, Jan 22, 2021 at 5:03 AM gshen  wrote:

> I am now testing with to stream into a Delta table. Interestingly I have
> gotten it working within a community version of Databricks, which leads me
> to think there might be something to do with my dependencies. I am
> checkpointing to ADLS Gen2 adding the following dependencies:
>
> delta-core_2.12-0.7.0.jar
> hadoop-azure-3.2.1.jar
> hadoop-azure-datalake-3.2.1.jar
> Rwildfly-openssl-java-1.1.3.Final.jar
> spark-sql-kafka-0-10_2.12-3.0.1.jar
> spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar
> commons-pool2-2.8.0.jar
> kafka-clients-0.10.2.2.jar
>
> Here's a more detailed the stack trace:
>
> {"LOG_LEVEL":"WARN", "LOG_MESSAGE":"2021-01-21 18:58:23,867
> [org.apache.spark.scheduler.TaskSetManager]
>  Lost task 0.0 in stage 1.0 (TID 3, 10.1.88.2, executor 1):
> org.apache.spark.util.TaskCompletionListene
> rException: Self-suppression not permitted
> at
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
> at
>
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
> at org.apache.spark.scheduler.Task.run(Task.scala:143)
> at
>
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> "}
>
> {"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,283
> [org.apache.spark.scheduler.TaskSetManager
> ] Task 0 in stage 1.0 failed 4 times; aborting job"}
>
> {"LOG_LEVEL":"ERROR", "LOG_MESSAGE":"2021-01-21 18:58:26,373
> [org.apache.spark.sql.execution.datasource
> s.FileFormatWriter] Aborting job 6115425c-9740-4e47-b2a1-e646c131e763."}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 1.0 failed 4 times,
> most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.1.28.7,
> executor
> 2): org.apache.spark.util.
> TaskCompletionListenerException: Self-suppression not permitted
> at
> org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
> at
>
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
> at org.apache.spark.scheduler.Task.run(Task.scala:143)
> at
>
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at
>
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
> at
>
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
> at
>
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:200
> 7)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
> at
>
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:97
> 3)
> at
>
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.
> scala:973)
> at scala.Option.foreach(Option.scala:407)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEvent

Re: Only one Active task in Spark Structured Streaming application

2021-01-21 Thread Jungtaek Lim
I'm not sure how many people could even guess possible reasons - I'd say
there's not enough information. No driver/executor logs, no
job/stage/executor information, no code.

On Thu, Jan 21, 2021 at 8:25 PM Jacek Laskowski  wrote:

> Hi,
>
> I'd look at stages and jobs as it's possible that the only task running is
> the missing one in a stage of a job. Just guessing...
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Thu, Jan 21, 2021 at 12:19 PM Eric Beabes 
> wrote:
>
>> Hello,
>>
>> My Spark Structured Streaming application was performing well for quite
>> some time but all of a sudden from today it has slowed down. I noticed in
>> the Spark UI that the 'No. of Active Tasks' is 1 even though 64 Cores are
>> available. (Please see the attached image).
>>
>> I don't believe there's any data skew issue related to partitioning of
>> data. What could be the reason for this? Please advise. Thanks.
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Structured Streaming Spark 3.0.1

2021-01-20 Thread Jungtaek Lim
I quickly looked into the attached log in SO post, and the problem doesn't
seem to be related to Kafka. The error stack trace is from checkpointing to
GCS, and the implementation of OutputStream for GCS seems to be provided
with Google.

Could you please elaborate the stack trace or upload the log with redacting
secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon 
wrote:

> Hi,
>
> I couldn't reproduce this error :/ I wonder if there is something else
> underline causing it...
>
> *Input*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
> localhost:9092 --topic test1
> {"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
>
> *Output*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic sink
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
>
>
> val rawDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test1")
>   .load
>   .selectExpr("CAST(value AS STRING)")
>
>
> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
>
> kafka_stream_output
>   .writeStream
>   .format("kafka")
>   .outputMode("update")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sink")
>   .option("checkpointLocation", "/tmp/check")
>   .start()
>
> spark.streams.awaitAnyTermination()
>
>
> On Wed, 20 Jan 2021 at 23:22, gshen  wrote:
>
>> This SO post is pretty much the exact same issue:
>>
>>
>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>>
>> The user mentions it's an issue with
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
I see no issue from running this code in local dev. (changed the scope of
Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In
3.0.0-preview update mode was restricted (as the error message says) and it
was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
.m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim 
wrote:

> And also include some test data as well. I quickly looked through the code
> and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon 
> wrote:
>
>> Hi,
>>
>> This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and
>> regarding the repo, I believe just commit it to your personal repo and that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes 
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>> any specific Github repository I need to commit code into - OR - just in
>>> our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi 
>>> wrote:
>>>
>>>> Thanks you, as we've asked could you please create a jira and commit
>>>> the code into github?
>>>> It would speed things up a lot.
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes 
>>>> wrote:
>>>>
>>>>> Here's a very simple reproducer app. I've attached 3 files:
>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>>>> email as well:
>>>>>
>>>>> package com.myorg
>>>>>
>>>>> import org.apache.hadoop.conf.Configuration
>>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>>> import org.apache.hadoop.security.UserGroupInformation
>>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>>>
>>>>> import scala.util.{Failure, Success, Try}
>>>>>
>>>>> object Spark3Test {
>>>>>
>>>>>   val isLocal = false
>>>>>
>>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>>
>>>>>   val START_DATE_INDEX = 21
>>>>>   val END_DATE_INDEX = 40
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>> val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", 
>>>>> isLocal)
>>>>> spark.sparkContext.setLogLevel("WARN")
>>>>>
>>>>> readKafkaStream(spark)
>>>>>   .groupByKey(row => {
>>>>> row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>>   })
>>>>>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> updateAcrossEvents
>>>>>   )
>>>>>   .filter(row => !row.inProgress)
>>>>>   .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>>>   .writeStream
>>>>>   .format("kafka")
>>>>>   .option(
>>>>> s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>> "10.29.42.141:9092"
>>>>> //"localhost:9092"
>>>>>   )
>>>>>   .option("topic", "spark3test")
>>>>>   .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>>   .outputMode("update")
>>>>>   .start()
>>>>> manageStreamingQueries(spark)
>>>>>   }
>>>>>
>>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>>>
>>>>> val stream = sparkSession.readStream
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>>   .option("subscribe", "inputTop

Re: Data source v2 streaming sinks does not support Update mode

2021-01-18 Thread Jungtaek Lim
>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> http://maven.apache.org/POM/4.0.0"; 
>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>>> http://maven.apache.org/maven-v4_0_0.xsd";>
>>>>   4.0.0
>>>>   com.myorg
>>>>   spark-3-conversion
>>>>   jar
>>>>   1.0-SNAPSHOT
>>>>   spark-3-conversion
>>>>   http://maven.apache.org
>>>>
>>>>   
>>>> 3.0.0
>>>> 2.12
>>>> 2.12.10
>>>> 1.4.0-RC1
>>>> true
>>>> 1.5
>>>> 1.5
>>>> UTF-8
>>>>   
>>>>
>>>>
>>>>   
>>>> 
>>>>   org.scala-lang
>>>>   scala-library
>>>>   ${scala.version}
>>>> 
>>>>
>>>> 
>>>>   org.apache.spark
>>>>   spark-core_${scala.binary.version}
>>>>   ${spark.version}
>>>>   provided
>>>> 
>>>> 
>>>>   org.apache.spark
>>>>   spark-streaming_${scala.binary.version}
>>>>   ${spark.version}
>>>>   provided
>>>> 
>>>> 
>>>>   org.apache.spark
>>>>   spark-sql_${scala.binary.version}
>>>>   ${spark.version}
>>>>   provided
>>>> 
>>>> 
>>>>   org.apache.spark
>>>>   
>>>> spark-streaming-kafka-0-10_${scala.binary.version}
>>>>   ${spark.version}
>>>> 
>>>>
>>>> 
>>>>   org.apache.spark
>>>>   spark-sql-kafka-0-10_${scala.binary.version}
>>>>   ${spark.version}
>>>> 
>>>>
>>>> 
>>>>   org.slf4j
>>>>   slf4j-log4j12
>>>>   1.7.7
>>>>   runtime
>>>> 
>>>> 
>>>>   log4j
>>>>   log4j
>>>>   1.2.17
>>>>   compile
>>>> 
>>>>
>>>> 
>>>>   org.scalatest
>>>>   scalatest_${scala.binary.version}
>>>>   3.0.7
>>>>   test
>>>> 
>>>>
>>>> 
>>>>   junit
>>>>   junit
>>>>   3.8.1
>>>>   test
>>>> 
>>>>   
>>>>
>>>>   
>>>> 
>>>>   
>>>> org.apache.maven.plugins
>>>> maven-shade-plugin
>>>> 3.0.0
>>>> 
>>>>   
>>>>   
>>>> install
>>>> 
>>>>   shade
>>>> 
>>>>   
>>>> 
>>>>   
>>>>
>>>>   
>>>>   
>>>> net.alchim31.maven
>>>> scala-maven-plugin
>>>> 3.2.2
>>>> 
>>>>   
>>>> 
>>>>   compile
>>>>   testCompile
>>>> 
>>>>   
>>>> 
>>>>   
>>>>
>>>>   
>>>> org.codehaus.mojo
>>>> build-helper-maven-plugin
>>>> 1.7
>>>> 
>>>>   
>>>>   
>>>> add-source
>>>> generate-sources
>>>> 
>>>>   add-source
>>>> 
>>>> 
>>>>   
>>>> src/main/scala
>>>>   
>>>> 
>>>>   
>>>>   
>>>>   
>>>> add-test-source
>>>> generate-test-sources
>>>> 
>>>>   add-test-source
>>>> 
>>>> 
>>>>   
>>>> src/test/scala
>>>>   
>>>> 
>>>>   

Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jungtaek Lim
Would you mind if I ask for a simple reproducer? Would be nice if you could
create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
wrote:

> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>
> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Which exact Spark version did you use? Did you make sure the version for
>> Spark and the version for spark-sql-kafka artifact are the same? (I asked
>> this because you've said you've used Spark 3.0 but spark-sql-kafka
>> dependency pointed to 3.1.0.)
>>
>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
>> wrote:
>>
>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
>>> streaming sinks does not support Update mode. === Streaming Query ===
>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
>>> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>> at
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
>>> sinks does not support Update mode. at
>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>> at
>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>> at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>> ... 1 more
>>>
>>>
>>> *Please see the attached image for more information.*
>>>
>>>
>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski  wrote:
>>>
>>>> Hi,
>>>>
>>>> Can you post the whole message? I'm trying to find what might be
>>>> causing it. A small reproducible example would be of help too. Thank you.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> 
>>>> https://about.me/JacekLaskowski
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>>>> wrote:
>>>>
>>>>> Trying to port my Spark 2.4 based (Structured) streaming application
>>>>> to Spark 3.0. I compiled it using the dependency given below:
>>>>>
>>>>> 
>>>>> org.apache.spark
>>>>> spark-sql-kafka-0-10_${scala.binary.version}
>>>>> 3.1.0
>>>>> 
>>>>>
>>>>>
>>>>> Every time I run it under Spark 3.0, I get this message: *Data source
>>>>> v2 streaming sinks does not support Update mode*
>>>>>
>>>>> I am using '*mapGroupsWithState*' so as per this link (
>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>> the only supported Output mode is "*Update*".
>>>>>
>>>>> My Sink is a Kafka topic so I am using this:
>>>>>
>>>>> .writeStream
>>>>> .format("kafka")
>>>>>
>>>>>
>>>>> What am I missing?
>>>>>
>>>>>
>>>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Data source v2 streaming sinks does not support Update mode

2021-01-12 Thread Jungtaek Lim
Which exact Spark version did you use? Did you make sure the version for
Spark and the version for spark-sql-kafka artifact are the same? (I asked
this because you've said you've used Spark 3.0 but spark-sql-kafka
dependency pointed to 3.1.0.)

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
wrote:

> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
> streaming sinks does not support Update mode. === Streaming Query ===
> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
> sinks does not support Update mode. at
> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
> ... 1 more
>
>
> *Please see the attached image for more information.*
>
>
> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Can you post the whole message? I'm trying to find what might be causing
>> it. A small reproducible example would be of help too. Thank you.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>> wrote:
>>
>>> Trying to port my Spark 2.4 based (Structured) streaming application to
>>> Spark 3.0. I compiled it using the dependency given below:
>>>
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_${scala.binary.version}
>>> 3.1.0
>>> 
>>>
>>>
>>> Every time I run it under Spark 3.0, I get this message: *Data source
>>> v2 streaming sinks does not support Update mode*
>>>
>>> I am using '*mapGroupsWithState*' so as per this link (
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>> the only supported Output mode is "*Update*".
>>>
>>> My Sink is a Kafka topic so I am using this:
>>>
>>> .writeStream
>>> .format("kafka")
>>>
>>>
>>> What am I missing?
>>>
>>>
>>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread Jungtaek Lim
Please refer my previous answer -
https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E
Probably we may want to add it in the SS guide doc. We didn't need it as it
just didn't work with eventually consistent model, and now it works anyway
but is very inefficient.


On Thu, Dec 24, 2020 at 6:16 AM David Morin 
wrote:

> Does it work with the standard AWS S3 solution and its new
> consistency model
> 
> ?
>
> Le mer. 23 déc. 2020 à 18:48, David Morin  a
> écrit :
>
>> Thanks.
>> My Spark applications run on nodes based on docker images but this is a
>> standalone mode (1 driver - n workers)
>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
>> Consistent view
>> 
>>  ?
>>
>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
>> écrit :
>>
>>> Yes. It is necessary to have a distributed file system because all the
>>> workers need to read/write to the checkpoint. The distributed file system
>>> has to be immediately consistent: When one node writes to it, the other
>>> nodes should be able to read it immediately
>>>
>>> The solutions/workarounds depend on where you are hosting your Spark
>>> application.
>>>
>>>
>>>
>>> *From: *David Morin 
>>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>>> *To: *"user@spark.apache.org" 
>>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints
>>> fail
>>>
>>>
>>>
>>> *CAUTION*: This email originated from outside of the organization. Do
>>> not click links or open attachments unless you can confirm the sender and
>>> know the content is safe.
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I have an issue with my Pyspark job related to checkpoint.
>>>
>>>
>>>
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>>> java.lang.IllegalStateException: Error reading delta file
>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>>> does not exist*
>>>
>>>
>>>
>>> This job is based on Spark 3.0.1 and Structured Streaming
>>>
>>> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
>>> don't want to manage an hdfs cluster if possible.
>>>
>>> Is it necessary to have a distributed filesystem ? What are the
>>> different solutions/workarounds ?
>>>
>>>
>>>
>>> Thanks in advance
>>>
>>> David
>>>
>>


Re: Structured Streaming Checkpoint Error

2020-12-02 Thread Jungtaek Lim
In theory it would work, but works very inefficiently on checkpointing. If
I understand correctly, it will write the content to the temp file on s3,
and rename the file which actually gets the temp file from s3 and write the
content of temp file to the final path on s3. Compared to checkpoint with
HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't
support write with overwrite=false. That said, there's no barrier if
concurrent streaming queries access to the same checkpoint and mess up.
With checkpoint in HDFS, the rename is atomic and only one succeeds even in
parallel and the other query lost writing to the checkpoint file simply
fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
wrote:

> Hello!
>
> @Gabor Somogyi   I wonder that now that s3 is 
> *strongly
> consistent* , would work fine.
>
>
> Regards!
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>
> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
> wrote:
>
>> Hi Gabor,
>>
>> Makes sense, thanks a lot!
>>
>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Structured Streaming is simply not working when checkpoint location is
>>> on S3 due to it's read-after-write consistency.
>>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
 Hi!

 I have an Structured Streaming Application that reads from kafka,
 performs some aggregations and writes in S3 in parquet format.

 Everything seems to work great except that from time to time I get a
 checkpoint error, at the beginning I thought it was a random error but it
 happened more than 3 times already in a few days

 Caused by: java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


 Does this happen to anyone else?

 Thanks in advance.

 *This is the full error :*

 ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

 java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

 at
 org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

 at
 org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

 at
 org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

 at
 org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

 at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

 at
 org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

 at
 org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

 at
 scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
 at scala.Option.getOrElse(Option.scala:189)

>>>


Re: Excessive disk IO with Spark structured streaming

2020-11-05 Thread Jungtaek Lim
FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.

1. https://issues.apache.org/jira/browse/SPARK-30294

On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim 
wrote:

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
>
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
>
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
>
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
>
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
>
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30294
> 2. https://github.com/HeartSaVioR/spark-state-tools
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:
>
>> Hi Jungtaek,
>>
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>>
>> There are two:
>>
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(> for the app>)
>> contains only empty subdir with GUID name
>>
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", > for writer>)
>> contains all the files
>>
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>>
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>>
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>>
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>>
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input o

Re: Best way to emit custom metrics to Prometheus in spark structured streaming

2020-11-02 Thread Jungtaek Lim
You can try out "Dataset.observe" added in Spark 3, which enables arbitrary
metrics to be logged and exposed to streaming query listeners.

On Tue, Nov 3, 2020 at 3:25 AM meetwes  wrote:

> Hi I am looking for the right approach to emit custom metrics for spark
> structured streaming job. *Actual Scenario:*
> I have an aggregated dataframe let's say with (id, key, value) columns.
> One of the kpis could be 'droppedRecords' and the corresponding value
> column has the number of dropped records. I need to filter all the KPIs
> with 'droppedRecords' and compute the sum on it's value column.
>
> *Challenges:*
> 1) Need to use only one streaming query so the metrics will be accurate (1
> readStream and 1 writeStream). If the metrics are emitted in a separate
> query, then it can cause inconsistencies due to varying watermark time
> between the query that does the aggregation and the one that gets only the
> metrics.
>
> *I evaluated some of the approaches:*
> 1) *foreachBatch sink:* This works for emitting metrics but there are
> other bugs.. Eg: The numOutputRows emitted in logs is always -1.
>
> 2) *Using accumulators:*
>
> val dropCounts: LongAccumulator = new LongAccumulator
>
> spark.sparkContext.register(dropCounts, "Drop Counts Accumulator")
> df.as[].map(row => {
>
> val value = row.value
>
> dropCounts.add(value.toLong)
>
> })
>
> This approach seems to have a bug in spark. The executor does add the
> value correctly but the driver's count is always 0.
>
> 3) *Using mapGroupsWithState.* This requires an action on the aggregated
> dataframe to retrieve metrics, therefore creates another streaming query.
>
> I am using spark 3.0.1. What's would be the best way to implement custom
> metrics?
> --
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Cannot perform operation after producer has been closed

2020-11-02 Thread Jungtaek Lim
Which Spark version do you use? There's a known issue on Kafka producer
pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check
whether your case is bound to the known issue or not.

https://issues.apache.org/jira/browse/SPARK-21869


On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes  wrote:

> I know this is related to Kafka but it happens during the Spark Structured
> Streaming job that's why I am asking on this mailing list.
>
> How would you debug this or get around this in Spark Structured Streaming?
> Any tips would be appreciated. Thanks.
>
>
> java.lang.IllegalStateException: Cannot perform operation after producer
> has been closed at
> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> at
> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
> at
> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>


Re: States get dropped in Structured Streaming

2020-10-23 Thread Jungtaek Lim
Unfortunately your information wouldn't provide any hint that rows in the
state are evicted correctly on watermark advance or there's an unknown bug
which some of the rows in state are silently dropped. I haven't heard of
the case for the latter - probably you'd like to double check it with
focusing on watermark advance. If the case is turned out to be the latter,
you'll probably need to deal with Spark code to inject the debug log.

On Fri, Oct 23, 2020 at 3:12 PM Eric Beabes 
wrote:

> We're using Stateful Structured Streaming in Spark 2.4. We are noticing
> that when the load on the system is heavy & LOTs of messages are coming in
> some of the states disappear with no error message. Any suggestions on how
> we can debug this? Any tips for fixing this?
>
> Thanks in advance.
>


Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Jungtaek Lim
I can't spend too much time on explaining one by one. I strongly encourage
you to do a deep-dive instead of just looking around as you want to know
about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably
I'd write a blog doc if there's no existing doc (I guess there should be
one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is
to ensure fault-tolerance. For example, If the query fails at batch 5 and
you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4,
preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here -
Spark should be able to read the state for batch 4 so that it can calculate
the new accumulated values for batch 5. In addition, partition means max
parallelism (they aren't aware of each other and they shouldn't), hence the
state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count
temp files with crc files, as we are talking about performance aspect) per
partition per micro-batch is the thing I already explained - I agree it's
not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
number of files by half. Probably we could propose Hadoop to skip creating
CRC files (I'm not sure it can be simply done as of now), but Spark
is conservative about upgrading the versions for dependencies so it might
not be available soon even if we address it right away.

As you've found here it's super important to find the right value of
shuffle partitions. It's partitioned by hash function, so it strongly
depends on the group key. If the cardinality of group key is low, probably
the right value of shuffle partitions should be fairly small. Unfortunately
once the query runs you can't change the value of shuffle partitions, as
Spark doesn't have the feature of state migration once the number of
partitions change. Either you need to predict the overall cardinality at
specific time and set the right value, or try to use a 3rd party state
tool. [2] (DISCLAIMER: I'm the author.)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://github.com/HeartSaVioR/spark-state-tools


On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:

> Hi Jungtaek,
>
> *> I meant the subdirectory inside the directory you're providing as
> "checkpointLocation", as there're several directories in that directory...*
>
> There are two:
>
> *my-spark-checkpoint-dir/MainApp*
> created by sparkSession.sparkContext().setCheckpointDir( for the app>)
> contains only empty subdir with GUID name
>
> *my-spark-checkpoint-dir/writer*
> created by ds.writeStream().option("checkpointLocation",  for writer>)
> contains all the files
>
> Within the latter ("writer") there are four subdirectories: commits,
> metadata, offsets, state.
>
> Breakdown of file creations within them, per 69 microbatches (when shuffle
> partition count = 200) is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 56232
>
> (Creation is identified by strace record for "openat" system call with
> O_CREAT flag and file path in the corresponding directory.)
>
> When shuffle partition count is 10, breakdown of file creations within
> them, per 69 microbatches, is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 2760
>
> *> The size of the delta file heavily depends on your stateful operation
> and data in each micro-batch. delta file only captures the "changes" of
> state in specific micro-batch, so there're cases you'll have very tiny
> delta files, e.g. cardinality of grouped key is small (hence cardinality of
> KVs is also small), small amount of inputs are provided per micro-batch,
> the overall size of aggregated row is small, there's skew on grouped key
> (hence some partitions get no input or small inputs), etc.*
>
>
> In my case there is no key in the Row object (unless the bucketized
> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
> large enough: the whole problem is that Spark does not want to save the
> microbatch as a single file. Even after I reduce the number of shuffle
> partitions (see below), the number of files per microbatch remains
> significantly larger than the number of shuffle partitions.
>
> ..
>
> When the number of shuffle partitions is 200, Spark creates 816 files (per
> microbatch) in checkpoint store and 202 files in Spark local-dir.
>
> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
> delta files.
> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
> and 2 other files (shuffle.data+shuffle.index).
>
> If I reduce the number of shuffle partitions, two things happen:
> - Throughput of a single pipeline i

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Sure. My point was that Delta Lake is also one of the 3rd party libraries
and there's no way for Apache Spark to do that. There's a Delta Lake's own
group and the request is better to be there.

On Mon, Oct 5, 2020 at 9:54 PM Enrico Minack  wrote:

> Though spark.read. refers to "built-in" data sources, there is
> nothing that prevents 3rd party libraries to "extend" spark.read in Scala
> or Python. As users know the Spark-way to read built-in data sources, it
> feels natural to hook 3rd party data sources under the same scheme, to give
> users a holistic and integrated feel.
>
> One Scala example (
> https://github.com/G-Research/spark-dgraph-connector#spark-dgraph-connector
> ):
>
> import uk.co.gresearch.spark.dgraph.connector._val triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> and in Python:
>
> from gresearch.spark.dgraph.connector import *triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> I agree that 3rd parties should also support the official
> spark.read.format() and the new catalog approaches.
>
> Enrico
>
> Am 05.10.20 um 14:03 schrieb Jungtaek Lim:
>
> Hi,
>
> "spark.read." is a "shorthand" for "built-in" data sources, not
> for external data sources. spark.read.format() is still an official way to
> use it. Delta Lake is not included in Apache Spark so that is indeed not
> possible for Spark to refer to.
>
> Starting from Spark 3.0, the concept of "catalog" is introduced, which you
> can simply refer to the table from catalog (if the external data source
> provides catalog implementation) and no need to specify the format
> explicitly (as catalog would know about it).
>
> This session explains the catalog and how Cassandra connector leverages
> it. I see some external data sources starting to support catalog, and in
> Spark itself there's some effort to support catalog for JDBC.
>
> https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
> michael.mo...@siemens-healthineers.com> wrote:
>
>> Hi there,
>>
>>
>>
>> I’m just wondering if there is any incentive to implement read/write
>> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
>> parquet?
>>
>>
>>
>> For example, using PySpark, “spark.read.parquet” is available, but
>> “spark.read.delta” is not (same for write).
>>
>> In my opinion, “spark.read.delta” feels more clean and pythonic compared
>> to “spark.read.format(‘delta’).load()”, especially if more options are
>> called, like “mode”.
>>
>>
>>
>> Can anyone explain the reasoning behind this, is this due to the Java
>> nature of Spark?
>>
>> From a pythonic point of view, I could also imagine a single read/write
>> method, with the format as an arg and kwargs related to the different file
>> format options.
>>
>>
>>
>> Best,
>>
>> Michael
>>
>>
>>
>>
>>
>


Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
d users can tolerate.

Probably micro-batch could also decouple micro-batch interval and
checkpoint interval to provide flexibility, say, I can tolerate
reprocessing up to 10 mins of data being processed when fail occurs, but
due to the output latency I should have micro-batch interval as 30 seconds.
(In other words, do a checkpoint per around 20 micro-batches.) That is a
bit tricky to implement actually, and also I don't see any request for this
so that is just a sketched idea.

And the same also for the files in Spark local directory?
>
> * * *
>
> The numbers for checkpoint directory are, of course, captured when it was
> set to a local drive (or Lustre/NFS.).
>
> For HDFS there are obviously no local file system calls for the checkpoint
> store, as HDFS does not present itself as an OS-level file system.
> Nevertheless the name of checkpoint directory was transmitted over HDFS
> connection socket 1,675 times per microbatch, so the number of high-level
> HDFS file operations must have been at least that high.
>
> * * *
>
> On a related note, for 920,000 events Spark made 700,000 attempts to
> execute chmod or readlink program, i.e. to launch an external subprocess
> with an executable in order to perform a file operation. Those 900,000
> attempts actually represent 150,000 cycles, and in each cycle Spark tried
> to launch the program from 6 different locations (/usr/local/sbin ->
> /usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
> finally finds it in the last. But then on the next cycle Spark/Hadoop does
> not re-use the knowledge of a previously found utility location, and
> repeats the search from the very start causing useless file system search
> operations over and over again.
>
> This may or may not matter when HDFS is used for checkpoint store
> (depending on how HDFS server implements the calls), but it does matter
> when a file system like Lustre or NFS is used for checkpoint storage.
> (Not to mention spawning readlink and chmod does not seem like a bright
> idea in the first place, although perhaps there might be a reason why
> Hadoop layer does it this way).
>
> Thanks,
> Sergey
>
> On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim 
> wrote:
>
>> First of all, you'd want to divide these numbers by the number of
>> micro-batches, as file creations in checkpoint directory would occur
>> similarly per micro-batch.
>> Second, you'd want to dive inside the checkpoint directory and have
>> separate numbers per top-subdirectory.
>>
>> After that we can see whether the value would make sense or not.
>>
>> Regarding file I/O issues on SS, two issues I know about are:
>> 1) If you use streaming aggregation, it unnecessarily creates a temporary
>> file for both read and write on the state store, while the file is only
>> needed for writing. That makes the number of file creations to be 2x. The
>> patch is proposed under SPARK-30294 [1].
>>
>> 2) Spark leverages HDFS API which is configured to create crc file per
>> file by default. (So you'll have 2x files than expected.) There's a bug in
>> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
>> (in short of how checkpoint works in SS, temp file is atomically renamed to
>> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
>> delete the crc file which two additional operations (exist -> delete) may
>> occur per crc file.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30294
>> 2. https://issues.apache.org/jira/browse/HADOOP-16255
>> 3. https://issues.apache.org/jira/browse/SPARK-28025
>>
>>
>> On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:
>>
>>> I am trying to run a Spark structured streaming program simulating basic
>>> scenario of ingesting events and calculating aggregates on a window with
>>> watermark, and I am observing an inordinate amount of disk IO Spark
>>> performs.
>>>
>>> The basic structure of the program is like this:
>>>
>>> sparkSession = SparkSession.builder()
>>>.appName()
>>>.master("local[*]")
>>>.config("spark.executor.memory", "8g")
>>>.config("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>.config("spark.kryoserializer.buffer", "8m")
>>> 

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have
separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary
file for both read and write on the state store, while the file is only
needed for writing. That makes the number of file creations to be 2x. The
patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file
by default. (So you'll have 2x files than expected.) There's a bug in HDFS
API (HADOOP-16255 [2]) which missed to handle crc files during rename (in
short of how checkpoint works in SS, temp file is atomically renamed to be
the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
delete the crc file which two additional operations (exist -> delete) may
occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://issues.apache.org/jira/browse/HADOOP-16255
3. https://issues.apache.org/jira/browse/SPARK-28025


On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:

> I am trying to run a Spark structured streaming program simulating basic
> scenario of ingesting events and calculating aggregates on a window with
> watermark, and I am observing an inordinate amount of disk IO Spark
> performs.
>
> The basic structure of the program is like this:
>
> sparkSession = SparkSession.builder()
>.appName()
>.master("local[*]")
>.config("spark.executor.memory", "8g")
>.config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>.config("spark.kryoserializer.buffer", "8m")
>.config("spark.local.dir", ...local
> directory...)
>.getOrCreate();
>
> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
> app ...);
>
> dataset = sparkSession.readStream()
>   .option("checkpointLocation", ... checkpoint dir for
> source ...)
>   .format(MockStreamingSource.class.getName())
>   .load();
>
> Dataset ds = dataset
>   .withWatermark("timestamp", "10 minutes")
>   .groupBy(
>   functions.window(functions.col("timestamp"),
> "2 minutes"),
>   functions.col("source"))
>   .agg(
>   functions.avg("D0").as("AVG_D0"),
>   functions.avg("I0").as("AVG_I0"));
>
> DataStreamWriter dsw = ds.writeStream()
>   // .trigger(Trigger.ProcessingTime("1
> minute"))
>   .option("checkpointLocation", .. checkpoint
> dir for writer ... );
>
> dsw.outputMode(OutputMode.Append())
>.format("console")
>.option("truncate", "false")
>.option("numRows", Integer.MAX_VALUE)
>.start()
>.awaitTermination();
>
>
> MockStreamingSource is just that -- a source intended to provide a
> simulated input. It generates microbatches of mock events and sends them to
> the app. In the testing scenario, the source simulates 20,000 devices each
> sending an event every 15 seconds for 11.5 minutes of logical time (just
> under 12 minutes of window size + watermark), for a total number of 920,000
> events.
>
> I initially started with microbatch sized to 500 events, and processing
> performance was totally dismal because of disk IO. I then increased
> microbatch size and performance got better, but still very poor. Microbatch
> size now is 13,334 events per batch, this corresponds to ingestion interval
> of 10 seconds. Smaller batches resulted in worse performance.
>
> But even with microbatch sized 13,334 event performance is poor because of
> excessive disk IO generated by Spark.
> Just ingesting data generated intra-app takes the program physical time
> equal to 40% of window size + watermark.
>
> Using strace, I measured that checkpoint directory for the stream writer
> receives the following number of Linux system calls:
>
> create/open file = 60,500 calls

Re: Arbitrary stateful aggregation: updating state without setting timeout

2020-10-05 Thread Jungtaek Lim
Hi,

That's not explained in the SS guide doc but explained in the scala API doc.
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/GroupState.html

The statement being quoted from the scala API doc answers your question.

The timeout is reset every time the function is called on a group, that is,
> when the group has new data, or the group has timed out. So the user has to
> set the timeout duration every time the function is called, otherwise there
> will not be any timeout set.


Simply saying, you'd want to always set timeout unless you remove state for
the group (key).

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yur...@gmail.com> wrote:‬

> Hi all, I have following question:
>
> What happens to the state (in terms of expiration) if I’m updating the
> state without setting timeout?
>
>
> E.g. in FlatMapGroupsWithStateFunction
>
>1. first batch:
>
> state.update(myObj)
>
> state.setTimeoutDuration(timeout)
>
>1. second batch:
>
> state.update(myObj)
>
>1. third batch (no data for a long time):
>   1.  state timed-out after initial timeout  expired? Not
>   timed-out?
>
>


Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Hi,

"spark.read." is a "shorthand" for "built-in" data sources, not for
external data sources. spark.read.format() is still an official way to use
it. Delta Lake is not included in Apache Spark so that is indeed not
possible for Spark to refer to.

Starting from Spark 3.0, the concept of "catalog" is introduced, which you
can simply refer to the table from catalog (if the external data source
provides catalog implementation) and no need to specify the format
explicitly (as catalog would know about it).

This session explains the catalog and how Cassandra connector leverages it.
I see some external data sources starting to support catalog, and in Spark
itself there's some effort to support catalog for JDBC.
https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
michael.mo...@siemens-healthineers.com> wrote:

> Hi there,
>
>
>
> I’m just wondering if there is any incentive to implement read/write
> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
> parquet?
>
>
>
> For example, using PySpark, “spark.read.parquet” is available, but
> “spark.read.delta” is not (same for write).
>
> In my opinion, “spark.read.delta” feels more clean and pythonic compared
> to “spark.read.format(‘delta’).load()”, especially if more options are
> called, like “mode”.
>
>
>
> Can anyone explain the reasoning behind this, is this due to the Java
> nature of Spark?
>
> From a pythonic point of view, I could also imagine a single read/write
> method, with the format as an arg and kwargs related to the different file
> format options.
>
>
>
> Best,
>
> Michael
>
>
>
>
>


Re: Query around Spark Checkpoints

2020-09-29 Thread Jungtaek Lim
Sorry I have no idea on Delta Lake. You may get a better answer from Delta
Lake mailing list.

One thing is clear that stateful processing is simply an essential feature
on almost every streaming framework. If you're struggling with something
around the state feature and trying to find a workaround then probably
something is going wrong. Please feel free to share it.

Thanks,
Jungtaek Lim (HeartSaVioR)

2020년 9월 30일 (수) 오전 1:14, Bryan Jeffrey 님이 작성:

> Jungtaek,
>
> How would you contrast stateful streaming with checkpoint vs. the idea of
> writing updates to a Delta Lake table, and then using the Delta Lake table
> as a streaming source for our state stream?
>
> Thank you,
>
> Bryan
>
> On Mon, Sep 28, 2020 at 9:50 AM Debabrata Ghosh 
> wrote:
>
>> Thank You Jungtaek and Amit ! This is very helpful indeed !
>>
>> Cheers,
>>
>> Debu
>>
>> On Mon, Sep 28, 2020 at 5:33 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>>
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
>>>
>>> You would need to implement CheckpointFileManager by yourself, which is
>>> tightly integrated with HDFS (parameters and return types of methods are
>>> mostly from HDFS). That wouldn't mean it's impossible to
>>> implement CheckpointFileManager against a non-filesystem, but it'd be
>>> non-trivial to override all of the functionalities and make it work
>>> seamlessly.
>>>
>>> Required consistency is documented via javadoc of CheckpointFileManager
>>> - please go through reading it, and evaluate whether your target storage
>>> can fulfill the requirement.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> As far as I know, it depends on whether you are using spark streaming
>>>> or structured streaming.
>>>> In spark streaming you can write your own code to checkpoint.
>>>> But in case of structured streaming it should be file location.
>>>> But main question in why do you want to checkpoint in
>>>> Nosql, as it's eventual consistence.
>>>>
>>>>
>>>> Regards
>>>> Amit
>>>>
>>>> On Sunday, September 27, 2020, Debabrata Ghosh 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I had a query around Spark checkpoints - Can I store the
>>>>> checkpoints in NoSQL or Kafka instead of Filesystem ?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Debu
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>


Re: Query around Spark Checkpoints

2020-09-27 Thread Jungtaek Lim
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

You would need to implement CheckpointFileManager by yourself, which is
tightly integrated with HDFS (parameters and return types of methods are
mostly from HDFS). That wouldn't mean it's impossible to
implement CheckpointFileManager against a non-filesystem, but it'd be
non-trivial to override all of the functionalities and make it work
seamlessly.

Required consistency is documented via javadoc of CheckpointFileManager -
please go through reading it, and evaluate whether your target storage can
fulfill the requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Sep 28, 2020 at 3:04 AM Amit Joshi 
wrote:

> Hi,
>
> As far as I know, it depends on whether you are using spark streaming or
> structured streaming.
> In spark streaming you can write your own code to checkpoint.
> But in case of structured streaming it should be file location.
> But main question in why do you want to checkpoint in
> Nosql, as it's eventual consistence.
>
>
> Regards
> Amit
>
> On Sunday, September 27, 2020, Debabrata Ghosh 
> wrote:
>
>> Hi,
>> I had a query around Spark checkpoints - Can I store the checkpoints
>> in NoSQL or Kafka instead of Filesystem ?
>>
>> Regards,
>>
>> Debu
>>
>


Re: Elastic Search sink showing -1 for numOutputRows

2020-09-07 Thread Jungtaek Lim
I don't know about ES sink. The availability of "numOutputRows" depends on
the API version the sink is implementing (DSv1 vs DSv2), so you may be
better to ask a question to the author of ES sink and confirm the case.

On Tue, Sep 8, 2020 at 5:15 AM jainshasha  wrote:

> Hi,
>
> Using structured spark streaming and sink the data into ElasticSearch.
> In the stats emit for each batch the "numOutputRows" showing -1 for
> ElasticSearch sink always
> whereas when i see other sinks like Kafka it shows either 0 or some values
> when it emit data.
>
> What could be the reason for showing -1 for ElasticSearch ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Keeping track of how long something has been in a queue

2020-09-06 Thread Jungtaek Lim
You may want to google around "session window" and "duration", and check
whether the concept fits your requirements. Probably adding some custom
logic on top of the session window would work for you, which requires you
to implement a custom function for flatMapGroupsWithState.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Sep 4, 2020 at 11:21 PM Hamish Whittal 
wrote:

> Sorry, I moved a paragraph,
>
> (2) If Ms green.th was first seen at 13:04:04, then at 13:04:05 and
>> finally at 13:04:17, she's been in the queue for 13 seconds (ignoring the
>> ms).
>>
>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Jungtaek Lim
Hi Amit,

if I remember correctly, you don't need to restart the query to reflect the
newly added topic and partition, if your subscription covers the topic
(like subscribe pattern). Please try it out.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
wrote:

> Any pointers will be appreciated.
>
> On Thursday, August 27, 2020, Amit Joshi 
> wrote:
>
>> Hi All,
>>
>> I am trying to understand the effect of adding topics and partitions to a
>> topic in kafka, which is being consumed by spark structured streaming
>> applications.
>>
>> Do we have to restart the spark structured streaming application to read
>> from the newly added topic?
>> Do we have to restart the spark structured streaming application to read
>> from the newly added partition to a topic?
>>
>> Kafka consumers have a meta data refresh property that works without
>> restarting.
>>
>> Thanks advance.
>>
>> Regards
>> Amit Joshi
>>
>


Re: Structured Streaming metric for count of delayed/late data

2020-08-22 Thread Jungtaek Lim
I proposed another approach which provided accurate count, though the
number doesn't always mean they're dropped. (
https://github.com/apache/spark/pull/24936 for details)

Btw, the limitation only applies to streaming aggregation, so you can
implement the aggregation by yourself via (flat)MapGroupsWithState - note
that the local aggregation is "optimization", so you may need to account
the performance impact.

On Sat, Aug 22, 2020 at 1:29 PM GOEL Rajat 
wrote:

> Thanks for pointing me to the Spark ticket and its limitations. Will try
> these changes.
>
> Is there any workaround for this limitation of inaccurate count, maybe by
> adding some additional streaming operation in SS job without impacting perf
> too much ?
>
>
>
> Regards,
>
> Rajat
>
>
>
> *From: *Jungtaek Lim 
> *Date: *Friday, 21 August 2020 at 12:07 PM
> *To: *Yuanjian Li 
> *Cc: *GOEL Rajat , "user@spark.apache.org" <
> user@spark.apache.org>
> *Subject: *Re: Structured Streaming metric for count of delayed/late data
>
>
>
> One more thing to say, unfortunately, the number is not accurate compared
> to the input rows on streaming aggregation, because Spark does
> local-aggregate and counts dropped inputs based on "pre-locally-aggregated"
> rows. You may want to treat the number as whether dropping inputs is
> happening or not.
>
>
>
> On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li 
> wrote:
>
> The metrics have been added in
> https://issues.apache.org/jira/browse/SPARK-24634, but the target version
> is 3.1.
>
> Maybe you can backport for testing since it's not a big change.
>
>
>
> Best,
>
> Yuanjian
>
>
>
> GOEL Rajat  于2020年8月20日周四 下午9:14写道:
>
> Hi All,
>
>
>
> I have a query if someone can please help. Is there any metric or
> mechanism of printing count of input records dropped due to watermarking
> (late data count) in a stream, during a window based aggregation, in
> Structured Streaming ? I am using Spark 3.0.
>
>
>
> Thanks & Regards,
>
> Rajat
>
>


Re: Structured Streaming metric for count of delayed/late data

2020-08-20 Thread Jungtaek Lim
One more thing to say, unfortunately, the number is not accurate compared
to the input rows on streaming aggregation, because Spark does
local-aggregate and counts dropped inputs based on "pre-locally-aggregated"
rows. You may want to treat the number as whether dropping inputs is
happening or not.

On Fri, Aug 21, 2020 at 3:31 PM Yuanjian Li  wrote:

> The metrics have been added in
> https://issues.apache.org/jira/browse/SPARK-24634, but the target version
> is 3.1.
> Maybe you can backport for testing since it's not a big change.
>
> Best,
> Yuanjian
>
> GOEL Rajat  于2020年8月20日周四 下午9:14写道:
>
>> Hi All,
>>
>>
>>
>> I have a query if someone can please help. Is there any metric or
>> mechanism of printing count of input records dropped due to watermarking
>> (late data count) in a stream, during a window based aggregation, in
>> Structured Streaming ? I am using Spark 3.0.
>>
>>
>>
>> Thanks & Regards,
>>
>> Rajat
>>
>


Re: [SPARK-STRUCTURED-STREAMING] IllegalStateException: Race while writing batch 4

2020-08-12 Thread Jungtaek Lim
File stream sink doesn't support the functionality. There're several
approaches to do so:

1) two queries write to Kafka (or any intermediate storage which allows
concurrent writes), and let next Spark application read and write to the
final path
2) two queries write to two different directories, and let next Spark
application read and write to the final path
3) use alternative data sources which enable concurrent writes on writing
files (you may want to check Delta Lake, Apache Hudi, Apache Iceberg for
such functionalities - though you'd probably need to learn many other
things to maintain the table in good shape)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 8, 2020 at 4:19 AM Amit Joshi  wrote:

> Hi,
>
> I have 2spark structure streaming queries writing to the same outpath in
> object storage.
> Once in a while I am getting the "IllegalStateException: Race while
> writing batch 4".
> I found that this error is because there are two writers writing to the
> output path. The file streaming sink doesn't support multiple writers.
> It assumes there is only one writer writing to the path. Each query needs
> to use its own output directory.
>
> Is there a way to write the output to the same path by both queries, as I
> need the output at the same path.?
>
> Regards
> Amit Joshi
>


Re: Lazy Spark Structured Streaming

2020-08-02 Thread Jungtaek Lim
SPARK-24156 runs the no-data batch to apply the updated watermark, but the
updated watermark may not be eligible to evict all state rows. (e.g.
window, lateness of watermark)
You'll still need to provide dummy input record to advance watermark, so
that all expected state rows can be evicted.

On Sun, Aug 2, 2020 at 5:44 PM Phillip Henry 
wrote:

> Thanks, Jungtaek. Very useful information.
>
> Could I please trouble you with one further question - what you said makes
> perfect sense but to what exactly does SPARK-24156
> <https://issues.apache.org/jira/browse/SPARK-24156> refer if not fixing
> the "need to add a dummy record to move watermark forward"?
>
> Kind regards,
>
> Phillip
>
>
>
>
> On Mon, Jul 27, 2020 at 11:41 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I'm not sure what exactly your problem is, but given you've mentioned
>> window and OutputMode.Append, you may want to remind that append mode
>> doesn't produce the output of aggregation unless the watermark "passes by".
>> It's expected behavior if you're seeing lazy outputs on OutputMode.Append
>> compared to OutputMode.Update.
>>
>> Unfortunately there's no mechanism on SSS to move forward only watermark
>> without actual input, so if you want to test some behavior on
>> OutputMode.Append you would need to add a dummy record to move watermark
>> forward.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> On Mon, Jul 27, 2020 at 8:10 PM Phillip Henry 
>> wrote:
>>
>>> Sorry, should have mentioned that Spark only seems reluctant to take the
>>> last windowed, groupBy batch from Kafka when using OutputMode.Append.
>>>
>>> I've asked on StackOverflow:
>>>
>>> https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka
>>> but am still struggling. Can anybody please help?
>>>
>>> How do people test their SSS code if you have to put a message on Kafka
>>> to get Spark to consume a batch?
>>>
>>> Kind regards,
>>>
>>> Phillip
>>>
>>>
>>> On Sun, Jul 12, 2020 at 4:55 PM Phillip Henry 
>>> wrote:
>>>
>>>> Hi, folks.
>>>>
>>>> I noticed that SSS won't process a waiting batch if there are no
>>>> batches after that. To put it another way, Spark must always leave one
>>>> batch on Kafka waiting to be consumed.
>>>>
>>>> There is a JIRA for this at:
>>>>
>>>> https://issues.apache.org/jira/browse/SPARK-24156
>>>>
>>>> that says it's resolved in 2.4.0 but my code
>>>> <https://github.com/PhillHenry/SSSPlayground/blob/Spark2/src/test/scala/uk/co/odinconsultants/sssplayground/windows/TimestampedStreamingSpec.scala>
>>>> is using 2.4.2 yet I still see Spark reluctant to consume another batch
>>>> from Kafka if it means there is nothing else waiting to be processed in the
>>>> topic.
>>>>
>>>> Do I have to do something special to exploit the behaviour that
>>>> SPARK-24156 says it has addressed?
>>>>
>>>> Regards,
>>>>
>>>> Phillip
>>>>
>>>>
>>>>
>>>>


Re: Pyspark: Issue using sql in foreachBatch sink

2020-07-31 Thread Jungtaek Lim
Python doesn't allow abbreviating () with no param, whereas Scala does. Use
`write()`, not `write`.

On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:

> In a pyspark SS job, trying to use sql instead of sql functions in
> foreachBatch sink
> throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception.
> However, the same thing works in Scala API.
>
> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
> exception.
> Is it a bug or known issue with Pyspark implementation? I noticed that I
> could perform other operations except the write method.
>
> Please, let me know how to fix this issue.
>
> See below code examples
> # Spark Scala method
> def processData(batchDF: DataFrame, batchId: Long) {
>batchDF.createOrReplaceTempView("tbl")
>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
> from tbl where date='2020-06-20' group by 1")
>outdf.printSchema()
>outdf.show
>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
> }
>
> ## pyspark python method
> def process_data(bdf, bid):
>   lspark = bdf._jdf.sparkSession()
>   bdf.createOrReplaceTempView("tbl")
>   outdf=lspark.sql("select action, count(*) as count from tbl where
> date='2020-06-20' group by 1")
>   outdf.printSchema()
>   # it works
>   outdf.show()
>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception
>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>
> Here is the full exception
> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>   File
> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 2381, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
> raise e
> AttributeError: 'JavaMember' object has no attribute 'format'
> at py4j.Protocol.getReturnValue(Protocol.java:473)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
> at com.sun.proxy.$Proxy20.call(Unknown Source)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)

Re: Lazy Spark Structured Streaming

2020-07-27 Thread Jungtaek Lim
I'm not sure what exactly your problem is, but given you've mentioned
window and OutputMode.Append, you may want to remind that append mode
doesn't produce the output of aggregation unless the watermark "passes by".
It's expected behavior if you're seeing lazy outputs on OutputMode.Append
compared to OutputMode.Update.

Unfortunately there's no mechanism on SSS to move forward only watermark
without actual input, so if you want to test some behavior on
OutputMode.Append you would need to add a dummy record to move watermark
forward.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Jul 27, 2020 at 8:10 PM Phillip Henry 
wrote:

> Sorry, should have mentioned that Spark only seems reluctant to take the
> last windowed, groupBy batch from Kafka when using OutputMode.Append.
>
> I've asked on StackOverflow:
>
> https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka
> but am still struggling. Can anybody please help?
>
> How do people test their SSS code if you have to put a message on Kafka to
> get Spark to consume a batch?
>
> Kind regards,
>
> Phillip
>
>
> On Sun, Jul 12, 2020 at 4:55 PM Phillip Henry 
> wrote:
>
>> Hi, folks.
>>
>> I noticed that SSS won't process a waiting batch if there are no batches
>> after that. To put it another way, Spark must always leave one batch on
>> Kafka waiting to be consumed.
>>
>> There is a JIRA for this at:
>>
>> https://issues.apache.org/jira/browse/SPARK-24156
>>
>> that says it's resolved in 2.4.0 but my code
>> <https://github.com/PhillHenry/SSSPlayground/blob/Spark2/src/test/scala/uk/co/odinconsultants/sssplayground/windows/TimestampedStreamingSpec.scala>
>> is using 2.4.2 yet I still see Spark reluctant to consume another batch
>> from Kafka if it means there is nothing else waiting to be processed in the
>> topic.
>>
>> Do I have to do something special to exploit the behaviour that
>> SPARK-24156 says it has addressed?
>>
>> Regards,
>>
>> Phillip
>>
>>
>>
>>


Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Jungtaek Lim
Please provide logs and dump file for the OOM case - otherwise no one could
say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread Jungtaek Lim
In SS, checkpointing is now a part of running micro-batch and it's
supported natively. (making clear, my library doesn't deal with the native
behavior of checkpointing)

In other words, it can't be customized like you have been doing with your
database. You probably don't need to do it with SS, but it still depends on
what you did with the offsets in the database.

On Tue, Jul 7, 2020 at 1:40 AM KhajaAsmath Mohammed 
wrote:

> Thanks Lim, this is really helpful. I have few questions.
>
> Our earlier approach used low level customer to read offsets from database
> and use those information to read using spark streaming in Dstreams. Save
> the offsets back once the process is finished. This way we never lost data.
>
> with your library, will it automatically process from the last offset it
> processed when the application was stopped or killed for some time.
>
> Thanks,
> Asmath
>
> On Sun, Jul 5, 2020 at 6:22 PM Jungtaek Lim 
> wrote:
>
>> There're sections in SS programming guide which exactly answer these
>> questions:
>>
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>>
>> Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
>> the author) to help you commit the offset to Kafka with the specific group
>> ID.
>>
>> https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
>>
>> After then, you can also leverage the Kafka ecosystem to monitor the
>> progress in point of Kafka's view, especially the gap between highest
>> offset and committed offset.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
>> wrote:
>>
>>> In 3.0 the community just added it.
>>>
>>> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are trying to move our existing code from spark dstreams to
>>>> structured streaming for one of the old application which we built few
>>>> years ago.
>>>>
>>>> Structured streaming job doesn’t have streaming tab in sparkui. Is
>>>> there a way to monitor the job submitted by us in structured streaming ?
>>>> Since the job runs for every trigger, how can we kill the job and restart
>>>> if needed.
>>>>
>>>> Any suggestions on this please
>>>>
>>>> Thanks,
>>>> Asmath
>>>>
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>


Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Jungtaek Lim
There're sections in SS programming guide which exactly answer these
questions:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
the author) to help you commit the offset to Kafka with the specific group
ID.

https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer

After then, you can also leverage the Kafka ecosystem to monitor the
progress in point of Kafka's view, especially the gap between highest
offset and committed offset.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
wrote:

> In 3.0 the community just added it.
>
> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
> wrote:
>
>> Hi,
>>
>> We are trying to move our existing code from spark dstreams to structured
>> streaming for one of the old application which we built few years ago.
>>
>> Structured streaming job doesn’t have streaming tab in sparkui. Is there
>> a way to monitor the job submitted by us in structured streaming ? Since
>> the job runs for every trigger, how can we kill the job and restart if
>> needed.
>>
>> Any suggestions on this please
>>
>> Thanks,
>> Asmath
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark streaming with Kafka

2020-07-02 Thread Jungtaek Lim
I can't reproduce. Could you please make sure you're running spark-shell
with official spark 3.0.0 distribution? Please try out changing the
directory and using relative path like "./spark-shell".

On Thu, Jul 2, 2020 at 9:59 PM dwgw  wrote:

> Hi
> I am trying to stream kafka topic from spark shell but i am getting the
> following error.
> I am using *spark 3.0.0/scala 2.12.10* (Java HotSpot(TM) 64-Bit Server VM,
> *Java 1.8.0_212*)
>
> *[spark@hdp-dev ~]$ spark-shell --packages
> org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0*
> Ivy Default Cache set to: /home/spark/.ivy2/cache
> The jars for the packages stored in: /home/spark/.ivy2/jars
> :: loading settings :: url =
>
> jar:file:/u01/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
> org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
> :: resolving dependencies ::
>
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226;1.0
> confs: [default]
> found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
> found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0
> in
> central
> found org.apache.kafka#kafka-clients;2.4.1 in central
> found com.github.luben#zstd-jni;1.4.4-3 in central
> found org.lz4#lz4-java;1.7.1 in central
> found org.xerial.snappy#snappy-java;1.1.7.5 in central
> found org.slf4j#slf4j-api;1.7.30 in central
> found org.spark-project.spark#unused;1.0.0 in central
> found org.apache.commons#commons-pool2;2.6.2 in central
> :: resolution report :: resolve 502ms :: artifacts dl 10ms
> :: modules in use:
> com.github.luben#zstd-jni;1.4.4-3 from central in [default]
> org.apache.commons#commons-pool2;2.6.2 from central in [default]
> org.apache.kafka#kafka-clients;2.4.1 from central in [default]
> org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 from central in
> [default]
> org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 from
> central in [default]
> org.lz4#lz4-java;1.7.1 from central in [default]
> org.slf4j#slf4j-api;1.7.30 from central in [default]
> org.spark-project.spark#unused;1.0.0 from central in [default]
> org.xerial.snappy#snappy-java;1.1.7.5 from central in [default]
>
> -
> |  |modules||   artifacts
> |
> |   conf   | number| search|dwnlded|evicted||
> number|dwnlded|
>
> -
> |  default |   9   |   0   |   0   |   0   ||   9   |   0
> |
>
> -
> :: retrieving ::
> org.apache.spark#spark-submit-parent-ed8a74c2-330b-4a8e-9a92-3dad7d22b226
> confs: [default]
> 0 artifacts copied, 9 already retrieved (0kB/13ms)
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
> Spark context Web UI available at http://hdp-dev.infodetics.com:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1593620640299_0015).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
>   /_/
>
> Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_212)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
>
> scala> val df = spark.
>  | readStream.
>  | format("kafka").
>  | option("kafka.bootstrap.servers", "XXX").
>  | option("subscribe", "XXX").
>  | option("kafka.sasl.mechanisms", "XXX").
>  | option("kafka.security.protocol", "XXX").
>  | option("kafka.sasl.username","XXX").
>  | option("kafka.sasl.password", "XXX").
>  | option("startingOffsets", "earliest").
>  | load
> java.lang.AbstractMethodError: Method
>
> org/apache/spark/sql/kafka010/KafkaSourceProvider.inferSchema(Lorg/apache/spark/sql/util/CaseInsensitiveStringMap;)Lorg/apache/spark/sql/types/StructType;
> is abstract
>   at
>
> org.apache.spark.sql.kafka010.KafkaSourceProvider.inferSchema(KafkaSourceProvider.scala)
>   at
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
>   at
>
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:215)
>   ... 57 elided
>
> Looking forward for a response.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Failure Threshold in Spark Structured Streaming?

2020-07-02 Thread Jungtaek Lim
Structured Streaming is basically following SQL semantic, which doesn't
have such a semantic of "max allowance of failures". If you'd like to
tolerate malformed data, please read with raw format (string or binary)
which won't fail with such data, and try converting. e.g. from_json() will
produce null if the data is malformed, so you can filter out later easily.


On Fri, Jul 3, 2020 at 1:24 AM Eric Beabes  wrote:

> Currently my job fails even on a single failure. In other words, even if
> one incoming message is malformed the job fails. I believe there's a
> property that allows us to set an acceptable number of failures. I Googled
> but couldn't find the answer. Can someone please help? Thanks.
>
>


Re: REST Structured Steaming Sink

2020-07-01 Thread Jungtaek Lim
I guess the method, query parameter, header, and the payload would be all
different for almost every use case - that makes it hard to generalize and
requires implementation to be pretty much complicated to be flexible enough.

I'm not aware of any custom sink implementing REST so your best bet would
be simply implementing your own with foreachBatch, but so someone might
jump in and provide a pointer if there is something in the Spark ecosystem.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin  wrote:

> Hi All,
>
>
> We ingest alot of restful APIs into our lake and I'm wondering if it is at
> all possible to created a rest sink in structured streaming?
>
> For now I'm only focusing on restful services that have an incremental ID
> so my sink can just poll for new data then ingest.
>
> I can't seem to find a connector that does this and my gut instinct tells
> me it's probably because it isn't possible due to something completely
> obvious that I am missing
>
> I know some RESTful API obfuscate the IDs to a hash of strings and that
> could be a problem but since I'm planning on focusing on just numerical IDs
> that just get incremented I think I won't be facing that issue
>
>
> Can anyone let me know if this sounds like a daft idea? Will I need
> something like Kafka or kinesis as a buffer and redundancy or am I
> overthinking this?
>
>
> I would love to bounce ideas with people who runs structured streaming
> jobs in production
>
>
> Kind regards
> San
>
>
>


Re: Question about 'maxOffsetsPerTrigger'

2020-06-30 Thread Jungtaek Lim
As Spark uses micro-batch for streaming, it's unavoidable to adjust the
batch size properly to achieve your expectation of throughput vs latency.
Especially, Spark uses global watermark which doesn't propagate (change)
during micro-batch, you'd want to make the batch relatively small to make
watermark move forward faster.

On Wed, Jul 1, 2020 at 2:54 AM Eric Beabes  wrote:

> While running my Spark (Stateful) Structured Streaming job I am setting
> 'maxOffsetsPerTrigger' value to 10 Million. I've noticed that messages are
> processed faster if I use a large value for this property.
>
> What I am also noticing is that until the batch is completely processed,
> no messages are getting written to the output Kafka topic. The 'State
> timeout' is set to 10 minutes so I am expecting to see at least some of the
> messages after 10 minutes or so BUT messages are not getting written until
> processing of the next batch is started.
>
> Is there any property I can use to kinda 'flush' the messages that are
> ready to be written? Please let me know. Thanks.
>
>


Re: [Structured spak streaming] How does cassandra connector readstream deals with deleted record

2020-06-26 Thread Jungtaek Lim
I'm not sure how it is implemented, but in general I wouldn't expect such
behavior on the connectors which read from non-streaming fashion storages.
The query result may depend on "when" the records are fetched.

If you need to reflect the changes in your query you'll probably want to
find a way to retrieve "change logs" from your external storage (or how
your system/product can also produce change logs if your external storage
doesn't support it), and adopt it to your query. There's a keyword you can
google to read further, "Change Data Capture".

Otherwise, you can apply the traditional approach, run a batch query
periodically and replace entire outputs.

On Thu, Jun 25, 2020 at 1:26 PM Rahul Kumar  wrote:

> Hello everyone,
>
> I was wondering, how Cassandra spark connector deals with deleted/updated
> record while readstream operation. If the record was already fetched in
> spark memory, and it got updated or deleted in database, does it get
> reflected in streaming join?
>
> Thanks,
> Rahul
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [ANNOUNCE] Apache Spark 3.0.0

2020-06-18 Thread Jungtaek Lim
Great, thanks all for your efforts on the huge step forward!

On Fri, Jun 19, 2020 at 12:13 PM Hyukjin Kwon  wrote:

> Yay!
>
> 2020년 6월 19일 (금) 오전 4:46, Mridul Muralidharan 님이 작성:
>
>> Great job everyone ! Congratulations :-)
>>
>> Regards,
>> Mridul
>>
>> On Thu, Jun 18, 2020 at 10:21 AM Reynold Xin  wrote:
>>
>>> Hi all,
>>>
>>> Apache Spark 3.0.0 is the first release of the 3.x line. It builds on
>>> many of the innovations from Spark 2.x, bringing new ideas as well as
>>> continuing long-term projects that have been in development. This release
>>> resolves more than 3400 tickets.
>>>
>>> We'd like to thank our contributors and users for their contributions
>>> and early feedback to this release. This release would not have been
>>> possible without you.
>>>
>>> To download Spark 3.0.0, head over to the download page:
>>> http://spark.apache.org/downloads.html
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-3-0-0.html
>>>
>>>
>>>
>>>


Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

2020-06-17 Thread Jungtaek Lim
Just in case if anyone prefers ASF projects then there are other
alternative projects in ASF as well, alphabetically, Apache Hudi [1] and
Apache Iceberg [2]. Both are recently graduated as top level projects.
(DISCLAIMER: I'm not involved in both.)

BTW it would be nice if we make the metadata implementation on file stream
source/sink be pluggable - from what I've seen, plugin approach has been
selected as the way to go whenever some part is going to be complicated and
it becomes arguable whether the part should be handled in Spark project vs
should be outside. e.g. checkpoint manager, state store provider, etc. It
would open up chances for the ecosystem to play with the challenge "without
completely re-writing the file stream source and sink", focusing on
scalability for metadata in a long run query. Alternative projects
described above will still provide more higher-level features and
look attractive, but sometimes it may be just "using a sledgehammer to
crack a nut".

1. https://hudi.apache.org/
2. https://iceberg.apache.org/


On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das 
wrote:

> Hello Rachana,
>
> Getting exactly-once semantics on files and making it scale to a very
> large number of files are very hard problems to solve. While Structured
> Streaming + built-in file sink solves the exactly-once guarantee that
> DStreams could not, it is definitely limited in other ways (scaling in
> terms of files, combining batch and streaming writes in the same place,
> etc). And solving this problem requires a holistic solution that is
> arguably beyond the scope of the Spark project.
>
> There are other projects that are trying to solve this file management
> issue. For example, Delta Lake (full disclosure, I am
> involved in it) was built to exactly solve this problem - get exactly-once
> and ACID guarantees on files, but also scale to handling millions of files.
> Please consider it as part of your solution.
>
>
>
>
> On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava
>  wrote:
>
>> I have written a simple spark structured steaming app to move data from
>> Kafka to S3. Found that in order to support exactly-once guarantee spark
>> creates _spark_metadata folder, which ends up growing too large as the
>> streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs
>> for a long time the metadata folder grows so big that we start getting OOM
>> errors. Only way to resolve OOM is delete Checkpoint and Metadata folder
>> and loose VALUABLE customer data.
>>
>> Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and
>> SPARK-24295)
>> Since Spark Streaming was NOT broken like this. Is Spark Streaming a
>> BETTER choice?
>>
>


Re: Structured Streaming using File Source - How to handle live files

2020-06-07 Thread Jungtaek Lim
Hi Nick,

I guess that's by design - Spark assumes the input file will not be
modified once it is placed on the input path. This makes Spark easy to
track the list of processed files vs unprocessed files. Assume input files
can be modified, then Spark will have to enumerate all of files and track
how many lines/bytes it reads "per file", even the bad case it may read the
incomplete line (if the writer doesn't guarantee that) and crash or bring
incorrect results.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Jun 8, 2020 at 2:43 AM ArtemisDev  wrote:

> We were trying to use structured streaming from file source, but had
> problems getting the files read by Spark properly.  We have another
> process generating the data files in the Spark data source directory on
> a continuous basis.  What we have observed was that the moment a data
> file is created before the data producing process finished, it was read
> by Spark immediately without reaching the EOF.  Then Spark will never
> revisit the file.  So we only ended up with empty data content.  The
> only way to make it to work is to produce the data files in a separate
> directory (e.g. /tmp) and move them to the Spark's file source dir after
> the data generation completes.
>
> My questions:  Is this a behavior by design or is there any way to
> control the Spark streaming process not to import a file while it is
> still being used by another process?  In other words, do we have to use
> the tmp dir to move data files around or can the data producing process
> and Spark share the same directory?
>
> Thanks!
>
> -- Nick
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RecordTooLargeException in Spark *Structured* Streaming

2020-05-25 Thread Jungtaek Lim
Hi,

You need to add the prefix "kafka." for the configurations which should be
propagated to the Kafka. Others will be used in Spark data source
itself. (Kafka connector in this case)

https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#kafka-specific-configurations

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, May 26, 2020 at 6:42 AM Something Something <
mailinglist...@gmail.com> wrote:

> I keep getting this error message:
>
>
> *The message is 1169350 bytes when serialized which is larger than the
> maximum request size you have configured with the max.request.size
> configuration.*
>
>
>
> As indicated in other posts, I am trying to set the “max.request.size”
> configuration in the Producer as follows:
>
>
> -
>
> .writeStream
>
> .format(*"kafka"*)
>
> .option(
>
>   *"kafka.bootstrap.servers"*,
>
>   conig.outputBootstrapServer
>
> )
>
> .option(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, *"1000"*)
>
> -
>
>
>
> But this is not working. Am I setting this correctly? Is there a different
> way to set this property under Spark Structured Streaming?
>
>
> Please help. Thanks.
>
>
>


Re: [structured streaming] [stateful] Null value appeared in non-nullable field

2020-05-23 Thread Jungtaek Lim
Hi,

Only with stack trace there’s nothing to look into it. It’d be better to
provide simple reproducer (code, and problematic inputs) so that someone
may give it a try.

You may also want to give it a try with 3.0.0, RC2 is better to test
against, but preview2 would be easier for end users to test.

2020년 5월 23일 (토) 오후 8:14, Srinivas V 님이 작성:

> Hello,
>  I am listening to a kaka topic through Spark Structured Streaming
> [2.4.5]. After processing messages for few mins, I am getting below
> NullPointerException.I have three beans used here 1.Event 2.StateInfo
> 3.SessionUpdateInfo. I am suspecting that the problem is with StateInfo,
> when it is writing state to hdfs it might be failing or it could be failing
> while I update accumulators. But why would it fail for some events but not
> for others? Once it fails, it stops the Streaming query.
> When I send all fields null except EevntId in my testing, it works fine.
> Any idea what could be happening?
> Attaching the full stack trace as well.
> This is a - yarn cluster, saving state in HDFS.
>
> Exception:
>
> 20/05/23 09:46:46 ERROR 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborting 
> commit for partition 42 (task 118121, attempt 9, stage 824.0)
> 20/05/23 09:46:46 ERROR 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask: Aborted 
> commit for partition 42 (task 118121, attempt 9, stage 824.0)
> 20/05/23 09:46:46 ERROR org.apache.spark.executor.Executor: Exception in task 
> 42.9 in stage 824.0 (TID 118121)
> java.lang.NullPointerException: Null value appeared in non-nullable field:
> top level input bean
> If the schema is inferred from a Scala tuple/case class, or a Java bean, 
> please try to use scala.Option[_] or other nullable types (e.g. 
> java.lang.Integer instead of int/scala.Int).
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.serializefromobject_doConsume_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>   at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>   at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 20/05/23 09:47:48 ERROR 
> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>
>
>
> Regards
>
> Srini V
>
>


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
Have you looked through and see metrics for state operators?

It has been providing "total rows" of state, and starting from Spark 2.4 it
also provides additional metrics specific to HDFSBackedStateStoreProvider,
including estimated memory usage in overall.

https://github.com/apache/spark/blob/24fac1e0c70a783b4d240607639ff20d7dd24191/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L168-L179


On Fri, May 8, 2020 at 11:30 AM Something Something <
mailinglist...@gmail.com> wrote:

> No. We are already capturing these metrics (e.g. numInputRows,
> inputRowsPerSecond).
>
> I am talking about "No. of States" in the memory at any given time.
>
> On Thu, May 7, 2020 at 4:31 PM Jungtaek Lim 
> wrote:
>
>> If you're referring total "entries" in all states in SS job, it's being
>> provided via StreamingQueryListener.
>>
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>>
>> Hope this helps.
>>
>> On Fri, May 8, 2020 at 3:26 AM Something Something <
>> mailinglist...@gmail.com> wrote:
>>
>>> Is there a way to get the total no. of active states in memory at any
>>> given point in a Stateful Spark Structured Streaming job? We are thinking
>>> of using this metric for 'Auto Scaling' our Spark cluster.
>>>
>>


Re: [E] Re: Pyspark Kafka Structured Stream not working.

2020-05-07 Thread Jungtaek Lim
It's not either 1 or 2. Both two items are applied. I haven't played with
DStream + pyspark but given the error message is clear you'll probably want
to change the client.id "Python Kafka streamer" to accommodate the naming
convention guided in error message.

On Thu, May 7, 2020 at 3:55 PM Vijayant Kumar 
wrote:

> Hi Jungtek,
>
>
>
> Thanks for the response. It appears to be #1.
>
> I will appreciate if you can share some sample command to submit the Spark
> application.?
>
>
>
> *From:* Jungtaek Lim [mailto:kabhwan.opensou...@gmail.com]
> *Sent:* Wednesday, May 06, 2020 8:24 PM
> *To:* Vijayant Kumar 
> *Cc:* user@spark.apache.org
> *Subject:* [E] Re: Pyspark Kafka Structured Stream not working.
>
>
>
> *[EXTERNAL EMAIL]* DO NOT CLICK links or attachments unless you recognize
> the sender and know the content is safe.
>
> Hi,
>
>
>
> 1. You seem to use DStream (Spark Streaming), not Structured Streaming.
>
> 2. I'm not familiar with pyspark, but looks like the error message is very
> clear - Kafka doesn't allow such name for "client.id". The error message
> guides the naming rule, so you may need to be adopted with such convention.
> (e.g. no space)
>
>
>
> Hope this helps,
>
>
>
> Thanks,
>
> Jungtaek Lim (HeartSaVioR)
>
>
>
> On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar <
> vijayant.ku...@mavenir.com.invalid> wrote:
>
> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: No. of active states?

2020-05-07 Thread Jungtaek Lim
If you're referring total "entries" in all states in SS job, it's being
provided via StreamingQueryListener.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Hope this helps.

On Fri, May 8, 2020 at 3:26 AM Something Something 
wrote:

> Is there a way to get the total no. of active states in memory at any
> given point in a Stateful Spark Structured Streaming job? We are thinking
> of using this metric for 'Auto Scaling' our Spark cluster.
>


Re: Pyspark Kafka Structured Stream not working.

2020-05-06 Thread Jungtaek Lim
Hi,

1. You seem to use DStream (Spark Streaming), not Structured Streaming.
2. I'm not familiar with pyspark, but looks like the error message is very
clear - Kafka doesn't allow such name for "client.id". The error message
guides the naming rule, so you may need to be adopted with such convention.
(e.g. no space)

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar
 wrote:

> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
>>>> Hi Rishi,
>>>>
>>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>>> The way we look at streaming is that it doesn't have to be always real
>>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>>> repeat indefinitely. See this blog post for more details!
>>>>
>>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I recently started playing with spark streaming, and checkpoint
>>>>> location feature looks very promising. I wonder if anyone has an opinion
>>>>> about using spark streaming with checkpoint location option as a slow 
>>>>> batch
>>>>> processing solution. What would be the pros and cons of utilizing 
>>>>> streaming
>>>>> with checkpoint location feature to achieve fault tolerance in batch
>>>>> processing application?
>>>>>
>>>>> --
>>>>> Regards,
>>>>>
>>>>> Rishi Shah
>>>>>
>>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [Structured Streaming] NullPointerException in long running query

2020-04-27 Thread Jungtaek Lim
The root cause of exception is occurred in executor side "Lost task 10.3 in
stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.

On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:

> Hi:
>   One of my long-running queries occasionally encountered the following
> exception:
>
>
>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
>> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
>> java.lang.NullPointerException
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
>> at
>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>> ... 1 more
>
>
>
> According to the exception stack, it seems to have nothi

Re: is RosckDB backend available in 3.0 preview?

2020-04-22 Thread Jungtaek Lim
Sorry I should have been more clear.

The discussion went to the conclusion that RocksDB state store cannot be
included in Spark main codebase - it should start as individual project,
and can be adopted when the project is popular enough. (See PR for more
details.) That's why I guided to the implementation on Spark ecosystem.

On Thu, Apr 23, 2020 at 1:22 AM kant kodali  wrote:

> is it going to make it in 3.0?
>
> On Tue, Apr 21, 2020 at 9:24 PM Jungtaek Lim 
> wrote:
>
>> Unfortunately, the short answer is no. Please refer the last part of
>> discussion on the PR https://github.com/apache/spark/pull/24922
>>
>> Unless we get any native implementation of this, I guess this project is
>> most widely known implementation for RocksDB backend state store -
>> https://github.com/chermenin/spark-states
>>
>> On Wed, Apr 22, 2020 at 11:32 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> 1. is RosckDB backend available in 3.0 preview?
>>> 2. if RocksDB can store intermediate results of a stream-stream join can
>>> I run  streaming join queries forever? forever I mean until I run out of
>>> disk. or put it another can I run the stream-stream join queries for years
>>> if necessary (imagine I got lot of disk capacity but not a whole lot of
>>> RAM)?
>>> 3. Does is to do incremental checkpointing to HDFS?
>>>
>>> Thanks!
>>>
>>>


Re: is RosckDB backend available in 3.0 preview?

2020-04-21 Thread Jungtaek Lim
Unfortunately, the short answer is no. Please refer the last part of
discussion on the PR https://github.com/apache/spark/pull/24922

Unless we get any native implementation of this, I guess this project is
most widely known implementation for RocksDB backend state store -
https://github.com/chermenin/spark-states

On Wed, Apr 22, 2020 at 11:32 AM kant kodali  wrote:

> Hi All,
>
> 1. is RosckDB backend available in 3.0 preview?
> 2. if RocksDB can store intermediate results of a stream-stream join can I
> run  streaming join queries forever? forever I mean until I run out of
> disk. or put it another can I run the stream-stream join queries for years
> if necessary (imagine I got lot of disk capacity but not a whole lot of
> RAM)?
> 3. Does is to do incremental checkpointing to HDFS?
>
> Thanks!
>
>


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
No, that's not a thing to apologize for. It's just your call - less context
would bring less reaction and interest.

On Wed, Apr 22, 2020 at 11:50 AM Ruijing Li  wrote:

> I apologize, but I cannot share it, even if it is just typical spark
> libraries. I definitely understand that limits debugging help, but wanted
> to understand if anyone has encountered a similar issue.
>
> On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim 
> wrote:
>
>> If there's no third party libraries in the dump then why not share the
>> thread dump? (I mean, the output of jstack)
>>
>> stack trace would be more helpful to find which thing acquired lock and
>> which other things are waiting for acquiring lock, if we suspect deadlock.
>>
>> On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li  wrote:
>>
>>> After refreshing a couple of times, I notice the lock is being swapped
>>> between these 3. The other 2 will be blocked by whoever gets this lock, in
>>> a cycle of 160 has lock -> 161 -> 159 -> 160
>>>
>>> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
>>> wrote:
>>>
>>>> In thread dump, I do see this
>>>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>>>> Monitor
>>>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>> Blocked by Thread(Some(160)) Lock
>>>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>> Blocked by Thread(Some(160)) Lock
>>>>
>>>> Could the fact that 160 has the monitor but is not running be causing a
>>>> deadlock preventing the job from finishing?
>>>>
>>>> I do see my Finalizer and main method are waiting. I don’t see any
>>>> other threads from 3rd party libraries or my code in the dump. I do see
>>>> spark context cleaner has timed waiting.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Strangely enough I found an old issue that is the exact same issue as
>>>>> mine
>>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>>>
>>>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>>>> now.
>>>>>
>>>>> Like the user in the jira issue I am using mesos, but I am reading
>>>>> from oracle instead of writing to Cassandra and S3.
>>>>>
>>>>>
>>>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>>>>
>>>>>> The Thread dump result table of Spark UI can provide some clues to
>>>>>> find out thread locks issue, such as:
>>>>>>
>>>>>>   Thread ID | Thread Name  | Thread State | Thread
>>>>>> Locks
>>>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked
>>>>>> by Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951
>>>>>> })
>>>>>>   48| Thread-16| RUNNABLE |
>>>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>>>
>>>>>> And echo thread row can show the call stacks after being clicked,
>>>>>> then you can check the root cause of holding locks like this(Thread 48 of
>>>>>> above):
>>>>>>
>>>>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>>>>> Method)
>>>>>>
>>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>>>>
>>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>>>>
>>>>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>>>>   
>>>>>>
>>>>>> Hope it can help you.
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> -z
>>>>>>
>>>>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>>>>> Jungtaek Lim  wrote:
>>>>>>
>>>>>> > Do thread dump continuously, per specific period (like 1s) and see
>>>>>> the
>>>>>> > ch

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Jungtaek Lim
If there's no third party libraries in the dump then why not share the
thread dump? (I mean, the output of jstack)

stack trace would be more helpful to find which thing acquired lock and
which other things are waiting for acquiring lock, if we suspect deadlock.

On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li  wrote:

> After refreshing a couple of times, I notice the lock is being swapped
> between these 3. The other 2 will be blocked by whoever gets this lock, in
> a cycle of 160 has lock -> 161 -> 159 -> 160
>
> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li  wrote:
>
>> In thread dump, I do see this
>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>> Monitor
>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>> Blocked by Thread(Some(160)) Lock
>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>> Blocked by Thread(Some(160)) Lock
>>
>> Could the fact that 160 has the monitor but is not running be causing a
>> deadlock preventing the job from finishing?
>>
>> I do see my Finalizer and main method are waiting. I don’t see any other
>> threads from 3rd party libraries or my code in the dump. I do see spark
>> context cleaner has timed waiting.
>>
>> Thanks
>>
>>
>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li  wrote:
>>
>>> Strangely enough I found an old issue that is the exact same issue as
>>> mine
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>
>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>> now.
>>>
>>> Like the user in the jira issue I am using mesos, but I am reading from
>>> oracle instead of writing to Cassandra and S3.
>>>
>>>
>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>>
>>>> The Thread dump result table of Spark UI can provide some clues to find
>>>> out thread locks issue, such as:
>>>>
>>>>   Thread ID | Thread Name  | Thread State | Thread Locks
>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
>>>> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>>>>   48| Thread-16| RUNNABLE |
>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>
>>>> And echo thread row can show the call stacks after being clicked, then
>>>> you can check the root cause of holding locks like this(Thread 48 of 
>>>> above):
>>>>
>>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>>> Method)
>>>>
>>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>>
>>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>>
>>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>>   
>>>>
>>>> Hope it can help you.
>>>>
>>>> --
>>>> Cheers,
>>>> -z
>>>>
>>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>>> Jungtaek Lim  wrote:
>>>>
>>>> > Do thread dump continuously, per specific period (like 1s) and see the
>>>> > change of stack / lock for each thread. (This is not easy to be done
>>>> in UI
>>>> > so maybe doing manually would be the only option. Not sure Spark UI
>>>> will
>>>> > provide the same, haven't used at all.)
>>>> >
>>>> > It will tell which thread is being blocked (even it's shown as
>>>> running) and
>>>> > which point to look at.
>>>> >
>>>> > On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li 
>>>> wrote:
>>>> >
>>>> > > Once I do. thread dump, what should I be looking for to tell where
>>>> it is
>>>> > > hanging? Seeing a lot of timed_waiting and waiting on driver.
>>>> Driver is
>>>> > > also being blocked by spark UI. If there are no tasks, is there a
>>>> point to
>>>> > > do thread dump of executors?
>>>> > >
>>>> > > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com>
>>>> > > wrote:
>>>> &g

Re: Spark Structure Streaming | FileStreamSourceLog not deleting list of input files | Spark -2.4.0

2020-04-21 Thread Jungtaek Lim
You're hitting an existing issue
https://issues.apache.org/jira/browse/SPARK-17604. While there's no active
PR to address it, I've been planning to take a look sooner than later.

Btw, you may also want to take a look at my previous mail - the topic on
the mail thread was regarding file stream sink metadata growing bigger, but
in fact that's basically the same issue, so you may get some information
from there. (tl;dr. I have bunch of PRs for addressing multiple issues on
file stream source and sink, just having lack of some love.)

https://lists.apache.org/thread.html/rb4ebf1d20d13db0a78694e8d301e51c326f803cb86fc1a1f66f2ae7e%40%3Cuser.spark.apache.org%3E

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Apr 21, 2020 at 8:23 PM Pappu Yadav  wrote:

> Hi Team,
>
> While Running Spark Below are some finding.
>
>1. FileStreamSourceLog is responsible for maintaining input source
>file list.
>2. Spark Streaming delete expired log files on the basis of s
>*park.sql.streaming.fileSource.log.deletion* and
>*spark.sql.streaming.minBatchesToRetain.*
>3. But while compacting logs Spark Streaming write the complete list
>of files streaming has seen till now in HDFS into one single .compact file.
>4. Over the course of time this compact file  is consuming around
>2GB-5GB in HDFS which will delay creation of compact file after every 10th
>Batch and also job restart time will increase.
>5. Why Spark Streaming is logging files in the system which are
>already deleted . While creating compact file there must be some configured
>timeout so that Spark can skip writing expired list of input files.
>
> *Also kindly let me know if i missed something and there is some
> configuration already present to handle this. *
>
> Regards
> Pappu Yadav
>


  1   2   >