Re: Missing / Duplicate Data when Spark retries

2020-09-10 Thread Ruijing Li
I agree Sean, although its strange since we aren’t using any UDFs but
sticking to spark provided functions. If anyone in the community has seen
such an issue before I would be happy to learn more!

On Thu, Sep 10, 2020 at 6:01 AM Sean Owen  wrote:

> It's more likely a subtle issue with your code or data, but hard to
>
> say without knowing more. The lineage is fine and deterministic, but
>
> your data or operations might not be.
>
>
>
> On Thu, Sep 10, 2020 at 12:03 AM Ruijing Li  wrote:
>
> >
>
> > Hi all,
>
> >
>
> > I am on Spark 2.4.4 using Mesos as the task resource scheduler. The
> context is my job maps over multiple datasets, for each dataset it takes
> one dataframe from a parquet file from one HDFS path, and another dataframe
> from second HDFS path, unions them by name, then deduplicate by most recent
> date using windowing and rank
> https://stackoverflow.com/questions/50269678/dropping-duplicate-records-based-using-window-function-in-spark-scala
> .
>
> >
>
> > I have a strange issue where sometimes my job fails from shuffle error
> and it retries the stage/task again. Unfortunately, it somehow loses data
> and generates duplicates after the retry succeeds. I read about spark and
> it should keep a lineage, my theory is somehow spark isn't keeping the
> correct lineage and actually regenerating only the successful data, so it
> created duplicates but lost parts of the data. I'm totally unsure how this
> would happen, I don't have indeterministic data though. Anyone have
> encountered something similar or an inkling?
>
> >
>
> > Thanks!
>
> >
>
> > --
>
> > Cheers,
>
> > Ruijing Li
>
> --
Cheers,
Ruijing Li


Missing / Duplicate Data when Spark retries

2020-09-09 Thread Ruijing Li
Hi all,

I am on Spark 2.4.4 using Mesos as the task resource scheduler. The context
is my job maps over multiple datasets, for each dataset it takes one
dataframe from a parquet file from one HDFS path, and another dataframe
from second HDFS path, unions them by name, then deduplicate by most recent
date using windowing and rank
https://stackoverflow.com/questions/50269678/dropping-duplicate-records-based-using-window-function-in-spark-scala
.

I have a strange issue where sometimes my job fails from shuffle error and
it retries the stage/task again. Unfortunately, it somehow loses data and
generates duplicates after the retry succeeds. I read about spark and it
should keep a lineage, my theory is somehow spark isn't keeping the correct
lineage and actually regenerating only the successful data, so it created
duplicates but lost parts of the data. I'm totally unsure how this would
happen, I don't have indeterministic data though. Anyone have encountered
something similar or an inkling?

Thanks!

-- 
Cheers,
Ruijing Li


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-05-06 Thread Ruijing Li
Wanted to update everyone on this, thanks for all the responses. I was able
to solve this issue after doing a jstack dump - I found out this was the
cause

https://github.com/scala/bug/issues/10436

Lesson learned - I’ll use a safer json parser like json4s, seems like that
one should be able to be thread-safe hopefully.

On Fri, Apr 24, 2020 at 4:34 AM Waleed Fateem 
wrote:

> Are you running this in local mode? If not, are you even sure that the
> hanging is occurring on the driver's side?
>
> Did you check the Spark UI to see if there is a straggler task or not? If
> you do have a straggler/hanging task, and in case this is not an
> application running in local mode then you need to get the Java thread dump
> of the executor's JVM process. Once you do, you'll want to review the 
> "Executor
> task launch worker for task XYZ" thread, whee XYZ is some integer value
> representing the task ID that was launched on that executor. In case you're 
> running
> this is local mode that thread would be located in the same Java thread
> dump that you have already collected.
>
>
> On Tue, Apr 21, 2020 at 9:51 PM Ruijing Li  wrote:
>
>> I apologize, but I cannot share it, even if it is just typical spark
>> libraries. I definitely understand that limits debugging help, but wanted
>> to understand if anyone has encountered a similar issue.
>>
>> On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> If there's no third party libraries in the dump then why not share the
>>> thread dump? (I mean, the output of jstack)
>>>
>>> stack trace would be more helpful to find which thing acquired lock and
>>> which other things are waiting for acquiring lock, if we suspect deadlock.
>>>
>>> On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li 
>>> wrote:
>>>
>>>> After refreshing a couple of times, I notice the lock is being swapped
>>>> between these 3. The other 2 will be blocked by whoever gets this lock, in
>>>> a cycle of 160 has lock -> 161 -> 159 -> 160
>>>>
>>>> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
>>>> wrote:
>>>>
>>>>> In thread dump, I do see this
>>>>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>>>>> Monitor
>>>>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>>> Blocked by Thread(Some(160)) Lock
>>>>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>>>> Blocked by Thread(Some(160)) Lock
>>>>>
>>>>> Could the fact that 160 has the monitor but is not running be causing
>>>>> a deadlock preventing the job from finishing?
>>>>>
>>>>> I do see my Finalizer and main method are waiting. I don’t see any
>>>>> other threads from 3rd party libraries or my code in the dump. I do see
>>>>> spark context cleaner has timed waiting.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
>>>>> wrote:
>>>>>
>>>>>> Strangely enough I found an old issue that is the exact same issue as
>>>>>> mine
>>>>>>
>>>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>>>>
>>>>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>>>>> now.
>>>>>>
>>>>>> Like the user in the jira issue I am using mesos, but I am reading
>>>>>> from oracle instead of writing to Cassandra and S3.
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei 
>>>>>> wrote:
>>>>>>
>>>>>>> The Thread dump result table of Spark UI can provide some clues to
>>>>>>> find out thread locks issue, such as:
>>>>>>>
>>>>>>>   Thread ID | Thread Name  | Thread State | Thread
>>>>>>> Locks
>>>>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked
>>>>>>> by Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951
>>>>>>> })
>>>>>>>   48| Thread-16| RUNNABLE |
>>>>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>>>>

Re: Good idea to do multi-threading in spark job?

2020-05-06 Thread Ruijing Li
Thanks for the answer Sean!

On Sun, May 3, 2020 at 10:35 AM Sean Owen  wrote:

> Spark will by default assume each task needs 1 CPU. On an executor
> with 16 cores and 16 slots, you'd schedule 16 tasks. If each is using
> 4 cores, then 64 threads are trying to run. If you're CPU-bound, that
> could slow things down. But to the extent some of tasks take some time
> blocking on I/O, it could increase overall utilization. You shouldn't
> have to worry about Spark there, but, you do have to consider that N
> tasks, each with its own concurrency, maybe executing your code in one
> JVM, and whatever synchronization that implies.
>
> On Sun, May 3, 2020 at 11:32 AM Ruijing Li  wrote:
> >
> > Hi all,
> >
> > We have a spark job (spark 2.4.4, hadoop 2.7, scala 2.11.12) where we
> use semaphores / parallel collections within our spark job. We definitely
> notice a huge speedup in our job from doing this, but were wondering if
> this could cause any unintended side effects? Particularly I’m worried
> about any deadlocks and if it could mess with the fixes for issues such as
> this
> > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-26961
> >
> > We do run with multiple cores.
> >
> > Thanks!
> > --
> > Cheers,
> > Ruijing Li
>
-- 
Cheers,
Ruijing Li


Good idea to do multi-threading in spark job?

2020-05-03 Thread Ruijing Li
Hi all,

We have a spark job (spark 2.4.4, hadoop 2.7, scala 2.11.12) where we use
semaphores / parallel collections within our spark job. We definitely
notice a huge speedup in our job from doing this, but were wondering if
this could cause any unintended side effects? Particularly I’m worried
about any deadlocks and if it could mess with the fixes for issues such as
this
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-26961

We do run with multiple cores.

Thanks!
-- 
Cheers,
Ruijing Li


Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-22 Thread Ruijing Li
For some reason, after restarting the app and trying again, latest now
works as expected. Not sure why it didn’t work before.

On Tue, Apr 21, 2020 at 1:46 PM Ruijing Li  wrote:

> Yes, we did. But for some reason latest does not show them. The count is
> always 0.
>
> On Sun, Apr 19, 2020 at 3:42 PM Jungtaek Lim 
> wrote:
>
>> Did you provide more records to topic "after" you started the query?
>> That's the only one I can imagine based on such information.
>>
>> On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> Apologies if this has been asked before, but I could not find the answer
>>> to this question. We have a structured streaming job, but for some reason,
>>> if we use startingOffsets = latest with foreachbatch mode, it doesn’t
>>> produce any data.
>>>
>>> Rather, in logs I see it repeats the message “ Fetcher [Consumer]
>>> Resetting offset for partition to offset” over and over again..
>>>
>>> However with startingOffsets=earliest, we don’t get this issue. I’m
>>> wondering then how we can use startingOffsets=latest as I wish to start
>>> from the latest offset available.
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Ruijing Li
I apologize, but I cannot share it, even if it is just typical spark
libraries. I definitely understand that limits debugging help, but wanted
to understand if anyone has encountered a similar issue.

On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim 
wrote:

> If there's no third party libraries in the dump then why not share the
> thread dump? (I mean, the output of jstack)
>
> stack trace would be more helpful to find which thing acquired lock and
> which other things are waiting for acquiring lock, if we suspect deadlock.
>
> On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li  wrote:
>
>> After refreshing a couple of times, I notice the lock is being swapped
>> between these 3. The other 2 will be blocked by whoever gets this lock, in
>> a cycle of 160 has lock -> 161 -> 159 -> 160
>>
>> On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
>> wrote:
>>
>>> In thread dump, I do see this
>>> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
>>> Monitor
>>> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>> Blocked by Thread(Some(160)) Lock
>>> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
>>> Blocked by Thread(Some(160)) Lock
>>>
>>> Could the fact that 160 has the monitor but is not running be causing a
>>> deadlock preventing the job from finishing?
>>>
>>> I do see my Finalizer and main method are waiting. I don’t see any other
>>> threads from 3rd party libraries or my code in the dump. I do see spark
>>> context cleaner has timed waiting.
>>>
>>> Thanks
>>>
>>>
>>> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
>>> wrote:
>>>
>>>> Strangely enough I found an old issue that is the exact same issue as
>>>> mine
>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>>>
>>>> However I’m using spark 2.4.4 so the issue should have been solved by
>>>> now.
>>>>
>>>> Like the user in the jira issue I am using mesos, but I am reading from
>>>> oracle instead of writing to Cassandra and S3.
>>>>
>>>>
>>>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>>>
>>>>> The Thread dump result table of Spark UI can provide some clues to
>>>>> find out thread locks issue, such as:
>>>>>
>>>>>   Thread ID | Thread Name  | Thread State | Thread
>>>>> Locks
>>>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
>>>>> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>>>>>   48| Thread-16| RUNNABLE |
>>>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>>>
>>>>> And echo thread row can show the call stacks after being clicked, then
>>>>> you can check the root cause of holding locks like this(Thread 48 of 
>>>>> above):
>>>>>
>>>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>>>> Method)
>>>>>
>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>>>
>>>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>>>
>>>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>>>   
>>>>>
>>>>> Hope it can help you.
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> -z
>>>>>
>>>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>>>> Jungtaek Lim  wrote:
>>>>>
>>>>> > Do thread dump continuously, per specific period (like 1s) and see
>>>>> the
>>>>> > change of stack / lock for each thread. (This is not easy to be done
>>>>> in UI
>>>>> > so maybe doing manually would be the only option. Not sure Spark UI
>>>>> will
>>>>> > provide the same, haven't used at all.)
>>>>> >
>>>>> > It will tell which thread is being blocked (even it's shown as
>>>>> running) and
>>>>> > which point to look at.
>>>>> >
>>>>> > On Thu, Apr 16, 2020 at 

Re: Using startingOffsets latest - no data from structured streaming kafka query

2020-04-21 Thread Ruijing Li
Yes, we did. But for some reason latest does not show them. The count is
always 0.

On Sun, Apr 19, 2020 at 3:42 PM Jungtaek Lim 
wrote:

> Did you provide more records to topic "after" you started the query?
> That's the only one I can imagine based on such information.
>
> On Fri, Apr 17, 2020 at 9:13 AM Ruijing Li  wrote:
>
>> Hi all,
>>
>> Apologies if this has been asked before, but I could not find the answer
>> to this question. We have a structured streaming job, but for some reason,
>> if we use startingOffsets = latest with foreachbatch mode, it doesn’t
>> produce any data.
>>
>> Rather, in logs I see it repeats the message “ Fetcher [Consumer]
>> Resetting offset for partition to offset” over and over again..
>>
>> However with startingOffsets=earliest, we don’t get this issue. I’m
>> wondering then how we can use startingOffsets=latest as I wish to start
>> from the latest offset available.
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Ruijing Li
In thread dump, I do see this
- SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
Monitor
- SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked
by Thread(Some(160)) Lock
-  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked
by Thread(Some(160)) Lock

Could the fact that 160 has the monitor but is not running be causing a
deadlock preventing the job from finishing?

I do see my Finalizer and main method are waiting. I don’t see any other
threads from 3rd party libraries or my code in the dump. I do see spark
context cleaner has timed waiting.

Thanks


On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li  wrote:

> Strangely enough I found an old issue that is the exact same issue as mine
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>
> However I’m using spark 2.4.4 so the issue should have been solved by now.
>
> Like the user in the jira issue I am using mesos, but I am reading from
> oracle instead of writing to Cassandra and S3.
>
>
> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>
>> The Thread dump result table of Spark UI can provide some clues to find
>> out thread locks issue, such as:
>>
>>   Thread ID | Thread Name  | Thread State | Thread Locks
>>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
>> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>>   48| Thread-16| RUNNABLE |
>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>
>> And echo thread row can show the call stacks after being clicked, then
>> you can check the root cause of holding locks like this(Thread 48 of above):
>>
>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native Method)
>>
>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>
>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>
>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>   
>>
>> Hope it can help you.
>>
>> --
>> Cheers,
>> -z
>>
>> On Thu, 16 Apr 2020 16:36:42 +0900
>> Jungtaek Lim  wrote:
>>
>> > Do thread dump continuously, per specific period (like 1s) and see the
>> > change of stack / lock for each thread. (This is not easy to be done in
>> UI
>> > so maybe doing manually would be the only option. Not sure Spark UI will
>> > provide the same, haven't used at all.)
>> >
>> > It will tell which thread is being blocked (even it's shown as running)
>> and
>> > which point to look at.
>> >
>> > On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li 
>> wrote:
>> >
>> > > Once I do. thread dump, what should I be looking for to tell where it
>> is
>> > > hanging? Seeing a lot of timed_waiting and waiting on driver. Driver
>> is
>> > > also being blocked by spark UI. If there are no tasks, is there a
>> point to
>> > > do thread dump of executors?
>> > >
>> > > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi <
>> gabor.g.somo...@gmail.com>
>> > > wrote:
>> > >
>> > >> The simplest way is to do thread dump which doesn't require any fancy
>> > >> tool (it's available on Spark UI).
>> > >> Without thread dump it's hard to say anything...
>> > >>
>> > >>
>> > >> On Tue, Apr 14, 2020 at 11:32 AM jane thorpe
>> 
>> > >> wrote:
>> > >>
>> > >>> Here a is another tool I use Logic Analyser  7:55
>> > >>> https://youtu.be/LnzuMJLZRdU
>> > >>>
>> > >>> you could take some suggestions for improving performance  queries.
>> > >>>
>> https://dzone.com/articles/why-you-should-not-use-select-in-sql-query-1
>> > >>>
>> > >>>
>> > >>> Jane thorpe
>> > >>> janethor...@aol.com
>> > >>>
>> > >>>
>> > >>> -Original Message-
>> > >>> From: jane thorpe 
>> > >>> To: janethorpe1 ; mich.talebzadeh <
>> > >>> mich.talebza...@gmail.com>; liruijing09 ;
>> user <
>> > >>> user@spark.apache.org>
>> > >>> Sent: Mon, 13 Apr 2020 8:32
>> > >>> Subject: Re: Spark hangs while reading from jdbc - does nothin

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Ruijing Li
After refreshing a couple of times, I notice the lock is being swapped
between these 3. The other 2 will be blocked by whoever gets this lock, in
a cycle of 160 has lock -> 161 -> 159 -> 160

On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li  wrote:

> In thread dump, I do see this
> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
> Monitor
> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
> Blocked by Thread(Some(160)) Lock
> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
> Blocked by Thread(Some(160)) Lock
>
> Could the fact that 160 has the monitor but is not running be causing a
> deadlock preventing the job from finishing?
>
> I do see my Finalizer and main method are waiting. I don’t see any other
> threads from 3rd party libraries or my code in the dump. I do see spark
> context cleaner has timed waiting.
>
> Thanks
>
>
> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li  wrote:
>
>> Strangely enough I found an old issue that is the exact same issue as
>> mine
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>
>> However I’m using spark 2.4.4 so the issue should have been solved by now.
>>
>> Like the user in the jira issue I am using mesos, but I am reading from
>> oracle instead of writing to Cassandra and S3.
>>
>>
>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:
>>
>>> The Thread dump result table of Spark UI can provide some clues to find
>>> out thread locks issue, such as:
>>>
>>>   Thread ID | Thread Name  | Thread State | Thread Locks
>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
>>> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>>>   48| Thread-16| RUNNABLE |
>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>
>>> And echo thread row can show the call stacks after being clicked, then
>>> you can check the root cause of holding locks like this(Thread 48 of above):
>>>
>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native Method)
>>>
>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>
>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>
>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>   
>>>
>>> Hope it can help you.
>>>
>>> --
>>> Cheers,
>>> -z
>>>
>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>> Jungtaek Lim  wrote:
>>>
>>> > Do thread dump continuously, per specific period (like 1s) and see the
>>> > change of stack / lock for each thread. (This is not easy to be done
>>> in UI
>>> > so maybe doing manually would be the only option. Not sure Spark UI
>>> will
>>> > provide the same, haven't used at all.)
>>> >
>>> > It will tell which thread is being blocked (even it's shown as
>>> running) and
>>> > which point to look at.
>>> >
>>> > On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li 
>>> wrote:
>>> >
>>> > > Once I do. thread dump, what should I be looking for to tell where
>>> it is
>>> > > hanging? Seeing a lot of timed_waiting and waiting on driver. Driver
>>> is
>>> > > also being blocked by spark UI. If there are no tasks, is there a
>>> point to
>>> > > do thread dump of executors?
>>> > >
>>> > > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi <
>>> gabor.g.somo...@gmail.com>
>>> > > wrote:
>>> > >
>>> > >> The simplest way is to do thread dump which doesn't require any
>>> fancy
>>> > >> tool (it's available on Spark UI).
>>> > >> Without thread dump it's hard to say anything...
>>> > >>
>>> > >>
>>> > >> On Tue, Apr 14, 2020 at 11:32 AM jane thorpe
>>> 
>>> > >> wrote:
>>> > >>
>>> > >>> Here a is another tool I use Logic Analyser  7:55
>>> > >>> https://youtu.be/LnzuMJLZRdU
>>> > >>>
>>> > >>> you could take some suggestions for improving performance  queries.
>>> > >>>
>>> https://dzone.com/arti

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-21 Thread Ruijing Li
Strangely enough I found an old issue that is the exact same issue as mine
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343

However I’m using spark 2.4.4 so the issue should have been solved by now.

Like the user in the jira issue I am using mesos, but I am reading from
oracle instead of writing to Cassandra and S3.


On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei  wrote:

> The Thread dump result table of Spark UI can provide some clues to find
> out thread locks issue, such as:
>
>   Thread ID | Thread Name  | Thread State | Thread Locks
>   13| NonBlockingInputStreamThread | WAITING  | Blocked by
> Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
>   48| Thread-16| RUNNABLE |
> Monitor(jline.internal.NonBlockingInputStream@103008951})
>
> And echo thread row can show the call stacks after being clicked, then you
> can check the root cause of holding locks like this(Thread 48 of above):
>
>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native Method)
>
> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>
> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>
> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>   
>
> Hope it can help you.
>
> --
> Cheers,
> -z
>
> On Thu, 16 Apr 2020 16:36:42 +0900
> Jungtaek Lim  wrote:
>
> > Do thread dump continuously, per specific period (like 1s) and see the
> > change of stack / lock for each thread. (This is not easy to be done in
> UI
> > so maybe doing manually would be the only option. Not sure Spark UI will
> > provide the same, haven't used at all.)
> >
> > It will tell which thread is being blocked (even it's shown as running)
> and
> > which point to look at.
> >
> > On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li 
> wrote:
> >
> > > Once I do. thread dump, what should I be looking for to tell where it
> is
> > > hanging? Seeing a lot of timed_waiting and waiting on driver. Driver is
> > > also being blocked by spark UI. If there are no tasks, is there a
> point to
> > > do thread dump of executors?
> > >
> > > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi <
> gabor.g.somo...@gmail.com>
> > > wrote:
> > >
> > >> The simplest way is to do thread dump which doesn't require any fancy
> > >> tool (it's available on Spark UI).
> > >> Without thread dump it's hard to say anything...
> > >>
> > >>
> > >> On Tue, Apr 14, 2020 at 11:32 AM jane thorpe
> 
> > >> wrote:
> > >>
> > >>> Here a is another tool I use Logic Analyser  7:55
> > >>> https://youtu.be/LnzuMJLZRdU
> > >>>
> > >>> you could take some suggestions for improving performance  queries.
> > >>>
> https://dzone.com/articles/why-you-should-not-use-select-in-sql-query-1
> > >>>
> > >>>
> > >>> Jane thorpe
> > >>> janethor...@aol.com
> > >>>
> > >>>
> > >>> -Original Message-
> > >>> From: jane thorpe 
> > >>> To: janethorpe1 ; mich.talebzadeh <
> > >>> mich.talebza...@gmail.com>; liruijing09 ;
> user <
> > >>> user@spark.apache.org>
> > >>> Sent: Mon, 13 Apr 2020 8:32
> > >>> Subject: Re: Spark hangs while reading from jdbc - does nothing
> Removing
> > >>> Guess work from trouble shooting
> > >>>
> > >>>
> > >>>
> > >>> This tool may be useful for you to trouble shoot your problems away.
> > >>>
> > >>>
> > >>>
> https://www.javacodegeeks.com/2020/04/simplifying-apm-remove-the-guesswork-from-troubleshooting.html
> > >>>
> > >>>
> > >>> "APM tools typically use a waterfall-type view to show the blocking
> > >>> time of different components cascading through the control flow
> within an
> > >>> application.
> > >>> These types of visualizations are useful, and AppOptics has them, but
> > >>> they can be difficult to understand for those of us without a PhD."
> > >>>
> > >>> Especially  helpful if you want to understand through visualisation
> and
> > >>> you do not have a phD.
> > >>>
> > >&

Re: Understanding spark structured streaming checkpointing system

2020-04-19 Thread Ruijing Li
It’s not intermittent, seems to happen everytime spark fails when it starts
up from last checkpoint and complains the offset is old. I checked the
offset and it is indeed true the offset expired from kafka side. My version
of spark is 2.4.4 using kafka 0.10

On Sun, Apr 19, 2020 at 3:38 PM Jungtaek Lim 
wrote:

> That sounds odd. Is it intermittent, or always reproducible if you starts
> with same checkpoint? What's the version of Spark?
>
> On Fri, Apr 17, 2020 at 6:17 AM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I have a question on how structured streaming does checkpointing. I’m
>> noticing that spark is not reading from the max / latest offset it’s seen.
>> For example, in HDFS, I see it stored offset file 30 which contains
>> partition: offset {1: 2000}
>>
>> But instead after stopping the job and restarting it, I see it instead
>> reads from offset file 9 which contains {1:1000}
>>
>> Can someone explain why spark doesn’t take the max offset?
>>
>> Thanks.
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Using startingOffsets latest - no data from structured streaming kafka query

2020-04-16 Thread Ruijing Li
Hi all,

Apologies if this has been asked before, but I could not find the answer to
this question. We have a structured streaming job, but for some reason, if
we use startingOffsets = latest with foreachbatch mode, it doesn’t produce
any data.

Rather, in logs I see it repeats the message “ Fetcher [Consumer] Resetting
offset for partition to offset” over and over again..

However with startingOffsets=earliest, we don’t get this issue. I’m
wondering then how we can use startingOffsets=latest as I wish to start
from the latest offset available.
-- 
Cheers,
Ruijing Li


Understanding spark structured streaming checkpointing system

2020-04-16 Thread Ruijing Li
Hi all,

I have a question on how structured streaming does checkpointing. I’m
noticing that spark is not reading from the max / latest offset it’s seen.
For example, in HDFS, I see it stored offset file 30 which contains
partition: offset {1: 2000}

But instead after stopping the job and restarting it, I see it instead
reads from offset file 9 which contains {1:1000}

Can someone explain why spark doesn’t take the max offset?

Thanks.
-- 
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-16 Thread Ruijing Li
Thanks Jungtaek, that makes sense.

I tried Burak’s solution of just turning failOnDataLoss to be false, but
instead of failing, the job is stuck. I’m guessing that the offsets are
being deleted faster than the job can process them and it will be stuck
unless I increase resources? Or does once the exception happen, spark will
hang?

On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim 
wrote:

> I think Spark is trying to ensure that it reads the input "continuously"
> without any missing. Technically it may be valid to say the situation is a
> kind of "data-loss", as the query couldn't process the offsets which are
> being thrown out, and owner of the query needs to be careful as it affects
> the result.
>
> If your streaming query keeps up with input rate then it's pretty rare for
> the query to go under retention. Even it lags a bit, it'd be safe if
> retention is set to enough period. The ideal state would be ensuring your
> query to process all offsets before they are thrown out by retention (don't
> leave the query lagging behind - either increasing processing power or
> increasing retention duration, though most probably you'll need to do
> former), but if you can't make sure and if you understand the risk then yes
> you can turn off the option and take the risk.
>
>
> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>
>> I see, I wasn’t sure if that would work as expected. The docs seems to
>> suggest to be careful before turning off that option, and I’m not sure why
>> failOnDataLoss is true by default.
>>
>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>
>>> Just set `failOnDataLoss=false` as an option in readStream?
>>>
>>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a spark structured streaming app that is consuming from a kafka
>>>> topic with retention set up. Sometimes I face an issue where my query has
>>>> not finished processing a message but the retention kicks in and deletes
>>>> the offset, which since I use the default setting of “failOnDataLoss=true”
>>>> causes my query to fail. The solution I currently have is manual, deleting
>>>> the offsets directory and rerunning.
>>>>
>>>> I instead like to have spark automatically fall back to the earliest
>>>> offset available. The solutions I saw recommend setting auto.offset =
>>>> earliest, but for structured streaming, you cannot set that. How do I do
>>>> this for structured streaming?
>>>>
>>>> Thanks!
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-16 Thread Ruijing Li
and
>> -- sn.name in ('session pga memory')
>> sn.name in ('session pga memory','session uga
>> memory')
>>   ) AS total_memory
>> , (b.block_gets + b.consistent_gets) TotGets
>> , b.physical_reads phyRds
>> , decode(a.status, 'ACTIVE', 'Y','INACTIVE', 'N') STATUS
>> , CASE WHEN a.sid in (select sid from v$mystat where rownum = 1)
>> THEN '<-- YOU' ELSE ' ' END "INFO"
>> FROM
>>  v$process p
>> ,v$session a
>> ,v$sess_io b
>> WHERE
>> a.paddr = p.addr
>> AND p.background IS NULL
>> --AND  a.sid NOT IN (select sid from v$mystat where rownum = 1)
>> AND a.sid = b.sid
>> AND a.username is not null
>> --AND (a.last_call_et < 3600 or a.status = 'ACTIVE')
>> --AND CURRENT_DATE - logon_time > 0
>> --AND a.sid NOT IN ( select sid from v$mystat where rownum=1)  -- exclude
>> me
>> --AND (b.block_gets + b.consistent_gets) > 0
>> ORDER BY a.username;
>> exit
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On Fri, 10 Apr 2020 at 17:37, Ruijing Li  wrote:
>>
>> Hi all,
>>
>> I am on spark 2.4.4 and using scala 2.11.12, and running cluster mode on
>> mesos. I am ingesting from an oracle database using spark.read.jdbc. I am
>> seeing a strange issue where spark just hangs and does nothing, not
>> starting any new tasks. Normally this job finishes in 30 stages but
>> sometimes it stops at 29 completed stages and doesn’t start the last stage.
>> The spark job is idling and there is no pending or active task. What could
>> be the problem? Thanks.
>> --
>> Cheers,
>> Ruijing Li
>>
>> --
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Ruijing Li
I see, I wasn’t sure if that would work as expected. The docs seems to
suggest to be careful before turning off that option, and I’m not sure why
failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:

> Just set `failOnDataLoss=false` as an option in readStream?
>
> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I have a spark structured streaming app that is consuming from a kafka
>> topic with retention set up. Sometimes I face an issue where my query has
>> not finished processing a message but the retention kicks in and deletes
>> the offset, which since I use the default setting of “failOnDataLoss=true”
>> causes my query to fail. The solution I currently have is manual, deleting
>> the offsets directory and rerunning.
>>
>> I instead like to have spark automatically fall back to the earliest
>> offset available. The solutions I saw recommend setting auto.offset =
>> earliest, but for structured streaming, you cannot set that. How do I do
>> this for structured streaming?
>>
>> Thanks!
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Ruijing Li
Hi all,

I have a spark structured streaming app that is consuming from a kafka
topic with retention set up. Sometimes I face an issue where my query has
not finished processing a message but the retention kicks in and deletes
the offset, which since I use the default setting of “failOnDataLoss=true”
causes my query to fail. The solution I currently have is manual, deleting
the offsets directory and rerunning.

I instead like to have spark automatically fall back to the earliest offset
available. The solutions I saw recommend setting auto.offset = earliest,
but for structured streaming, you cannot set that. How do I do this for
structured streaming?

Thanks!
-- 
Cheers,
Ruijing Li


Spark hangs while reading from jdbc - does nothing

2020-04-10 Thread Ruijing Li
Hi all,

I am on spark 2.4.4 and using scala 2.11.12, and running cluster mode on
mesos. I am ingesting from an oracle database using spark.read.jdbc. I am
seeing a strange issue where spark just hangs and does nothing, not
starting any new tasks. Normally this job finishes in 30 stages but
sometimes it stops at 29 completed stages and doesn’t start the last stage.
The spark job is idling and there is no pending or active task. What could
be the problem? Thanks.
-- 
Cheers,
Ruijing Li


Re: Can you view thread dumps on spark UI if job finished

2020-04-08 Thread Ruijing Li
Thanks Zahid, Yes I am using history server to see previous UIs.

 However, my question still remains on viewing old thread dumps, as I
cannot see them on the old completed spark UIs, only when spark context is
running.

On Wed, Apr 8, 2020 at 4:01 PM Zahid Rahman  wrote:

> Spark UI is only available while SparkContext is running.
>
> However  You can get to the Spark UI after your application  completes or
> crashes.
>
> To do this Spark includes a tool called the Spark History Server that
> allows you to reconstruct the Spark UI.
>
> You can find up to date information on how to use this tool in the spark
> documentation https://spark.apache.org/docs/latest/monitoring.html
>
>
>
>
>
> On Wed, 8 Apr 2020, 23:47 Ruijing Li,  wrote:
>
>> Hi all,
>>
>> As stated in title, currently when I view the spark UI of a completed
>> spark job, I see there are thread dump links in the executor tab, but
>> clicking on them does nothing. Is it possible to see the thread dumps
>> somehow even if the job finishes? On spark 2.4.5.
>>
>> Thanks.
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Can you view thread dumps on spark UI if job finished

2020-04-08 Thread Ruijing Li
Hi all,

As stated in title, currently when I view the spark UI of a completed spark
job, I see there are thread dump links in the executor tab, but clicking on
them does nothing. Is it possible to see the thread dumps somehow even if
the job finishes? On spark 2.4.5.

Thanks.
-- 
Cheers,
Ruijing Li


Re: can we all help use our expertise to create an IT solution for Covid-19

2020-03-26 Thread Ruijing Li
might find that the most effective analysis might be
>>>>>> delivered through an excel sheet ;)
>>>>>> So before technology I'd suggest to get access to sources and then
>>>>>> figure out how to best exploit them and deliver the information to the
>>>>>> right people
>>>>>>
>>>>>> On Thu, Mar 26, 2020 at 2:29 PM Chenguang He 
>>>>>> wrote:
>>>>>>
>>>>>>> Have you taken a look at this (
>>>>>>> https://coronavirus.1point3acres.com/en/test  )?
>>>>>>>
>>>>>>> They have a visualizer with a very basic analysis of the outbreak.
>>>>>>>
>>>>>>> On Thu, Mar 26, 2020 at 8:54 AM Mich Talebzadeh <
>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> Agreed, computers are not the end but means to an end. We all have
>>>>>>>> to start from somewhere. It all helps.
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>> which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>> damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 26 Mar 2020 at 13:48, abew1...@phenetics.com
>>>>>>>> abew1...@phenetics.com  wrote:
>>>>>>>>
>>>>>>>>> Yeah
>>>>>>>>> unfortunately, there's really nothing information tech. can do to
>>>>>>>>> solve biological problems. IMHO, that's just human nature, people 
>>>>>>>>> just get
>>>>>>>>> sick. I know this is going to trigger a flood of emails.. but let's 
>>>>>>>>> examine
>>>>>>>>> what IT is. It's using computers. Computers are only good as the data
>>>>>>>>> inputted into it. As complex as biology, medicine, etc.. and yes even
>>>>>>>>> politics is. A machine can't prevent sickness and disease. I wish it
>>>>>>>>> could... but it can't. A deeper solution is needed.
>>>>>>>>>
>>>>>>>>> Just my 2 cents, Abe
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On March 26, 2020 3:41 PM Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Do you think we can create a global solution in the cloud using
>>>>>>>>> volunteers like us and third party employees. What I have in mind is 
>>>>>>>>> to
>>>>>>>>> create a comprehensive real time solution to get data from various
>>>>>>>>> countries, universities pushed into a fast database through Kafka and 
>>>>>>>>> Spark
>>>>>>>>> and used downstream for greater analytics. I am sure likes of Goggle 
>>>>>>>>> etc.
>>>>>>>>> will provide free storage and likely many vendors will grab the
>>>>>>>>> opportunity.
>>>>>>>>>
>>>>>>>>> We can then donate this to WHO or others and we can make it very
>>>>>>>>> modular though microservices etc.
>>>>>>>>>
>>>>>>>>> I hope this does not sound futuristic.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> .. spend time to analyse ..
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Abraham R. Wilcox
>>>>>>>>> Sales Director (African Region)
>>>>>>>>> 8x8 Hosted VoIP - Communications & Collaboration Solutions
>>>>>>>>> 7257 NW 4TH BLVD SUITE 305
>>>>>>>>> <https://www.google.com/maps/search/7257+NW+4TH+BLVD+SUITE+305+GAINESVILLE,+FL+32607+%0D%0AUS?entry=gmail=g>
>>>>>>>>> GAINESVILLE, FL 32607
>>>>>>>>> <https://www.google.com/maps/search/7257+NW+4TH+BLVD+SUITE+305+GAINESVILLE,+FL+32607+%0D%0AUS?entry=gmail=g>
>>>>>>>>>
>>>>>>>>> <https://www.google.com/maps/search/7257+NW+4TH+BLVD+SUITE+305+GAINESVILLE,+FL+32607+%0D%0AUS?entry=gmail=g>
>>>>>>>>>
>>>>>>>>> US
>>>>>>>>> <https://www.google.com/maps/search/7257+NW+4TH+BLVD+SUITE+305+GAINESVILLE,+FL+32607+%0D%0AUS?entry=gmail=g>
>>>>>>>>> Direct: +1 510 646 1484
>>>>>>>>> US Voice: +1 641 715 3900 ext. 755489#
>>>>>>>>> US Fax: +1 855 661 4166
>>>>>>>>> Alt. email: awilco...@gmail.com
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Chenguang He
>>>>>>>
>>>>>> --
Cheers,
Ruijing Li


ForEachBatch collecting batch to driver

2020-03-10 Thread Ruijing Li
Hi all,

I’m curious on how foreachbatch works in spark structured streaming. So
since it is taking in a micro batch dataframe, that means the code in
foreachbatch is executing on spark driver? Does this mean for large
batches, you could potentially have OOM issues from collecting each
partition into the driver?
-- 
Cheers,
Ruijing Li


Re: Schema store for Parquet

2020-03-09 Thread Ruijing Li
Thanks Magnus,

I’ll explore Atlas and see what I can find.

On Wed, Mar 4, 2020 at 11:10 AM Magnus Nilsson  wrote:

> Apache Atlas is the apache data catalog. Maybe want to look into that. It
> depends on what your use case is.
>
> On Wed, Mar 4, 2020 at 8:01 PM Ruijing Li  wrote:
>
>> Thanks Lucas and Magnus,
>>
>> Would there be any open source solutions other than Apache Hive
>> metastore, if we don’t wish to use Apache Hive and spark?
>>
>> Thanks.
>>
>> On Wed, Mar 4, 2020 at 10:40 AM lucas.g...@gmail.com <
>> lucas.g...@gmail.com> wrote:
>>
>>> Or AWS glue catalog if you're in AWS
>>>
>>> On Wed, 4 Mar 2020 at 10:35, Magnus Nilsson  wrote:
>>>
>>>> Google hive metastore.
>>>>
>>>> On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Has anyone explored efforts to have a centralized storage of schemas
>>>>> of different parquet files? I know there is schema management for Avro, 
>>>>> but
>>>>> couldn’t find solutions for parquet schema management. Thanks!
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Schema store for Parquet

2020-03-04 Thread Ruijing Li
Thanks Lucas and Magnus,

Would there be any open source solutions other than Apache Hive metastore,
if we don’t wish to use Apache Hive and spark?

Thanks.

On Wed, Mar 4, 2020 at 10:40 AM lucas.g...@gmail.com 
wrote:

> Or AWS glue catalog if you're in AWS
>
> On Wed, 4 Mar 2020 at 10:35, Magnus Nilsson  wrote:
>
>> Google hive metastore.
>>
>> On Wed, Mar 4, 2020 at 7:29 PM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> Has anyone explored efforts to have a centralized storage of schemas of
>>> different parquet files? I know there is schema management for Avro, but
>>> couldn’t find solutions for parquet schema management. Thanks!
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li


Re: Integration testing Framework Spark SQL Scala

2020-02-25 Thread Ruijing Li
Just wanted to follow up on this. If anyone has any advice, I’d be
interested in learning more!

On Thu, Feb 20, 2020 at 6:09 PM Ruijing Li  wrote:

> Hi all,
>
> I’m interested in hearing the community’s thoughts on best practices to do
> integration testing for spark sql jobs. We run a lot of our jobs with cloud
> infrastructure and hdfs - this makes debugging a challenge for us,
> especially with problems that don’t occur from just initializing a
> sparksession locally or testing with spark-shell. Ideally, we’d like some
> sort of docker container emulating hdfs and spark cluster mode, that you
> can run locally.
>
> Any test framework, tips, or examples people can share? Thanks!
> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li


Integration testing Framework Spark SQL Scala

2020-02-20 Thread Ruijing Li
Hi all,

I’m interested in hearing the community’s thoughts on best practices to do
integration testing for spark sql jobs. We run a lot of our jobs with cloud
infrastructure and hdfs - this makes debugging a challenge for us,
especially with problems that don’t occur from just initializing a
sparksession locally or testing with spark-shell. Ideally, we’d like some
sort of docker container emulating hdfs and spark cluster mode, that you
can run locally.

Any test framework, tips, or examples people can share? Thanks!
-- 
Cheers,
Ruijing Li


Re: Better way to debug serializable issues

2020-02-20 Thread Ruijing Li
Thanks all for the answer. Unfortunately while I wasn’t able to use the
extra parameters to get the needed information, I did solve my issue. It
was an issue of using pureconfig to read a certain config from hadoop
before the spark session initialized, therefore pureconfig would error out
in deserializing the class before spark could configure properly.


On Tue, Feb 18, 2020 at 10:24 AM Maxim Gekk 
wrote:

> Hi Ruijing,
>
> Spark uses SerializationDebugger (
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/serializer/SerializationDebugger.html)
> as default debugger to detect the serialization issues. You can take more
> detailed serialization exception information by setting the following while
> creating a cluster:
> spark.driver.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true
> spark.executor.extraJavaOptions
> -Dsun.io.serialization.extendedDebugInfo=true
>
> Maxim Gekk
>
> Software Engineer
>
> Databricks, Inc.
>
>
> On Tue, Feb 18, 2020 at 1:02 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> When working with spark jobs, I sometimes have to tackle with
>> serialization issues, and I have a difficult time trying to fix those. A
>> lot of times, the serialization issues happen only in cluster mode across
>> the network in a mesos container, so I can’t debug locally. And the
>> exception thrown by spark is not very helpful to find the cause.
>>
>> I’d love to hear some tips on how to debug in the right places. Also, I’d
>> be interested to know if in future releases it would be possible to point
>> out which class or function is causing the serialization issue (right now I
>> find its either Java generic classes or the class Spark is running itself).
>> Thanks!
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Better way to debug serializable issues

2020-02-18 Thread Ruijing Li
Hi all,

When working with spark jobs, I sometimes have to tackle with serialization
issues, and I have a difficult time trying to fix those. A lot of times,
the serialization issues happen only in cluster mode across the network in
a mesos container, so I can’t debug locally. And the exception thrown by
spark is not very helpful to find the cause.

I’d love to hear some tips on how to debug in the right places. Also, I’d
be interested to know if in future releases it would be possible to point
out which class or function is causing the serialization issue (right now I
find its either Java generic classes or the class Spark is running itself).
Thanks!
-- 
Cheers,
Ruijing Li


Re: Best way to read batch from Kafka and Offsets

2020-02-15 Thread Ruijing Li
Thought to update this thread. Figured out my issue with forEachBatch and
structured streaming, I had an issue where I did a count() before write()
so my streaming query branched into 2. I am now using Trigger and
structured streaming to handle checkpointing instead of doing it myself.
Thanks all for your help!

On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li  wrote:

> Looks like I’m wrong, since I tried that exact snippet and it worked
>
> So to be clear, in the part where I do batchDF.write.parquet, that is not
> the exact code I’m using.
>
> I’m using a custom write function that does similar to write.parquet but
> has some added functionality. Somehow my custom write function isn’t
> working correctly
>
>  Is batchDF a static dataframe though?
>
> Thanks
>
> On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I tried with forEachBatch but got an error. Is this expected?
>>
>> Code is
>>
>> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
>> batchDF.write.parquet(hdfsPath)
>> }
>> .option(“checkPointLocation”, anotherHdfsPath)
>> .start()
>>
>> Exception is: Queries with streaming sources must be executed with
>> writeStream.start()
>>
>> But I thought forEachBatch would treat the batchDF as a static dataframe?
>>
>> Thanks,
>> RJ
>>
>> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Burak,
>>>
>>> I am not quite used to streaming, but was almost thinking on the same
>>> lines :) makes a lot of sense to me now.
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:
>>>
>>>> Do you really want to build all of that and open yourself to bugs when
>>>> you can just use foreachBatch? Here are your options:
>>>>
>>>> 1. Build it yourself
>>>>
>>>> // Read offsets from some store
>>>> prevOffsets = readOffsets()
>>>> latestOffsets = getOffsets()
>>>>
>>>> df = spark.read.format("kafka").option("startOffsets",
>>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>>> batchLogic(df)
>>>>
>>>> saveOffsets(latestOffsets)
>>>>
>>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>>
>>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>>> batchId) => batchLogic(df)).trigger("once").start()
>>>>
>>>> With Option (1), you're going to have to (re)solve:
>>>>  a) Tracking and consistency of offsets
>>>>  b) Potential topic partition mismatches
>>>>  c) Offsets that may have aged out due to retention
>>>>  d) Re-execution of jobs and data consistency. What if your job fails
>>>> as you're committing the offsets in the end, but the data was already
>>>> stored? Will your getOffsets method return the same offsets?
>>>>
>>>> I'd rather not solve problems that other people have solved for me, but
>>>> ultimately the decision is yours to make.
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Thanks Anil, I think that’s the approach I will take.
>>>>>
>>>>> Hi Burak,
>>>>>
>>>>> That was a possibility to think about, but my team has custom
>>>>> dataframe writer functions we would like to use, unfortunately they were
>>>>> written for static dataframes in mind. I do see there is a ForEachBatch
>>>>> write mode but my thinking was at that point it was easier to read from
>>>>> kafka through batch mode.
>>>>>
>>>>> Thanks,
>>>>> RJ
>>>>>
>>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>>>>>
>>>>>> Hi Ruijing,
>>>>>>
>>>>>> Why do you not want to use structured streaming here? This is exactly
>>>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>>>> build that solution yourself.
>>>>>> You also get exactly once semantics if you use the built in sinks.
>>>>>>
>>>>>> Best,
>>>>>> Burak

Spark 2.4.4 has bigger memory impact than 2.3?

2020-02-15 Thread Ruijing Li
Hi all,

We recently upgraded to our jobs to spark 2.4.4 from 2.3 and noticed that
some jobs are failing due to lack of resources - particularly lack of
executor memory causing some executors to fail. However, no code change was
made other than the upgrade. Does spark 2.4.4 require more executor memory
than previous versions of spark? I’d be interested to know if anyone else
has this issue. We are on scala 2.11.12 on java 8
-- 
Cheers,
Ruijing Li


Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not
the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but
has some added functionality. Somehow my custom write function isn’t
working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li  wrote:

> Hi all,
>
> I tried with forEachBatch but got an error. Is this expected?
>
> Code is
>
> df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
> batchDF.write.parquet(hdfsPath)
> }
> .option(“checkPointLocation”, anotherHdfsPath)
> .start()
>
> Exception is: Queries with streaming sources must be executed with
> writeStream.start()
>
> But I thought forEachBatch would treat the batchDF as a static dataframe?
>
> Thanks,
> RJ
>
> On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta 
> wrote:
>
>> Hi Burak,
>>
>> I am not quite used to streaming, but was almost thinking on the same
>> lines :) makes a lot of sense to me now.
>>
>> Regards,
>> Gourav
>>
>> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:
>>
>>> Do you really want to build all of that and open yourself to bugs when
>>> you can just use foreachBatch? Here are your options:
>>>
>>> 1. Build it yourself
>>>
>>> // Read offsets from some store
>>> prevOffsets = readOffsets()
>>> latestOffsets = getOffsets()
>>>
>>> df = spark.read.format("kafka").option("startOffsets",
>>> prevOffsets).option("endOffsets", latestOffsets).load()
>>> batchLogic(df)
>>>
>>> saveOffsets(latestOffsets)
>>>
>>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>>
>>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>>> batchId) => batchLogic(df)).trigger("once").start()
>>>
>>> With Option (1), you're going to have to (re)solve:
>>>  a) Tracking and consistency of offsets
>>>  b) Potential topic partition mismatches
>>>  c) Offsets that may have aged out due to retention
>>>  d) Re-execution of jobs and data consistency. What if your job fails as
>>> you're committing the offsets in the end, but the data was already stored?
>>> Will your getOffsets method return the same offsets?
>>>
>>> I'd rather not solve problems that other people have solved for me, but
>>> ultimately the decision is yours to make.
>>>
>>> Best,
>>> Burak
>>>
>>>
>>>
>>>
>>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:
>>>
>>>> Thanks Anil, I think that’s the approach I will take.
>>>>
>>>> Hi Burak,
>>>>
>>>> That was a possibility to think about, but my team has custom dataframe
>>>> writer functions we would like to use, unfortunately they were written for
>>>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>>>> my thinking was at that point it was easier to read from kafka through
>>>> batch mode.
>>>>
>>>> Thanks,
>>>> RJ
>>>>
>>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>>>>
>>>>> Hi Ruijing,
>>>>>
>>>>> Why do you not want to use structured streaming here? This is exactly
>>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>>> build that solution yourself.
>>>>> You also get exactly once semantics if you use the built in sinks.
>>>>>
>>>>> Best,
>>>>> Burak
>>>>>
>>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni 
>>>>> wrote:
>>>>>
>>>>>> Hi Ruijing,
>>>>>>
>>>>>> We did the below things to read Kafka in batch from spark:
>>>>>>
>>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>>> 2) Get the end offset dynamically when the job executes.
>>>>>> 3) Pass the start and end offsets
>>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>>> post processing the data)
>>>>>>
>>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>>> state information of the offsets externally.
>>>>>

Re: Best way to read batch from Kafka and Offsets

2020-02-05 Thread Ruijing Li
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) =>
batchDF.write.parquet(hdfsPath)
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with
writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta 
wrote:

> Hi Burak,
>
> I am not quite used to streaming, but was almost thinking on the same
> lines :) makes a lot of sense to me now.
>
> Regards,
> Gourav
>
> On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz  wrote:
>
>> Do you really want to build all of that and open yourself to bugs when
>> you can just use foreachBatch? Here are your options:
>>
>> 1. Build it yourself
>>
>> // Read offsets from some store
>> prevOffsets = readOffsets()
>> latestOffsets = getOffsets()
>>
>> df = spark.read.format("kafka").option("startOffsets",
>> prevOffsets).option("endOffsets", latestOffsets).load()
>> batchLogic(df)
>>
>> saveOffsets(latestOffsets)
>>
>> 2. Structured Streaming + Trigger.Once + foreachBatch
>>
>> spark.readStream.format("kafka").load().writeStream.foreachBatch((df,
>> batchId) => batchLogic(df)).trigger("once").start()
>>
>> With Option (1), you're going to have to (re)solve:
>>  a) Tracking and consistency of offsets
>>  b) Potential topic partition mismatches
>>  c) Offsets that may have aged out due to retention
>>  d) Re-execution of jobs and data consistency. What if your job fails as
>> you're committing the offsets in the end, but the data was already stored?
>> Will your getOffsets method return the same offsets?
>>
>> I'd rather not solve problems that other people have solved for me, but
>> ultimately the decision is yours to make.
>>
>> Best,
>> Burak
>>
>>
>>
>>
>> On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li  wrote:
>>
>>> Thanks Anil, I think that’s the approach I will take.
>>>
>>> Hi Burak,
>>>
>>> That was a possibility to think about, but my team has custom dataframe
>>> writer functions we would like to use, unfortunately they were written for
>>> static dataframes in mind. I do see there is a ForEachBatch write mode but
>>> my thinking was at that point it was easier to read from kafka through
>>> batch mode.
>>>
>>> Thanks,
>>> RJ
>>>
>>> On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:
>>>
>>>> Hi Ruijing,
>>>>
>>>> Why do you not want to use structured streaming here? This is exactly
>>>> why structured streaming + Trigger.Once was built, just so that you don't
>>>> build that solution yourself.
>>>> You also get exactly once semantics if you use the built in sinks.
>>>>
>>>> Best,
>>>> Burak
>>>>
>>>> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>>>>
>>>>> Hi Ruijing,
>>>>>
>>>>> We did the below things to read Kafka in batch from spark:
>>>>>
>>>>> 1) Maintain the start offset (could be db, file etc)
>>>>> 2) Get the end offset dynamically when the job executes.
>>>>> 3) Pass the start and end offsets
>>>>> 4) Overwrite the start offset with the end offset. (Should be done
>>>>> post processing the data)
>>>>>
>>>>> Currently to make it work in batch mode, you need to maintain the
>>>>> state information of the offsets externally.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Anil
>>>>>
>>>>> -Sent from my mobile
>>>>> http://anilkulkarni.com/
>>>>>
>>>>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> My use case is to read from single kafka topic using a batch spark
>>>>>> sql job (not structured streaming ideally). I want this batch job every
>>>>>> time it starts to get the last offset it stopped at, and start reading 
>>>>>> from
>>>>>> there until it caught up to the latest offset, store the result and stop
>>>>>> the job. Given the dataframe has a partition and offset column, my first
>>>>>> thought for offset management is to groupBy partition and agg the max
>>>>>> offset, then store it in HDFS. Next time the job runs, it will read and
>>>>>> start from this max offset using startingOffsets
>>>>>>
>>>>>> However, I was wondering if this will work. If the kafka producer
>>>>>> failed an offset and later decides to resend it, I will have skipped it
>>>>>> since I’m starting from the max offset sent. How does spark structured
>>>>>> streaming know to continue onwards - does it keep a state of all offsets
>>>>>> seen? If so, how can I replicate this for batch without missing data? Any
>>>>>> help would be appreciated.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li


Re: Best way to read batch from Kafka and Offsets

2020-02-04 Thread Ruijing Li
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe
writer functions we would like to use, unfortunately they were written for
static dataframes in mind. I do see there is a ForEachBatch write mode but
my thinking was at that point it was easier to read from kafka through
batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz  wrote:

> Hi Ruijing,
>
> Why do you not want to use structured streaming here? This is exactly why
> structured streaming + Trigger.Once was built, just so that you don't build
> that solution yourself.
> You also get exactly once semantics if you use the built in sinks.
>
> Best,
> Burak
>
> On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni  wrote:
>
>> Hi Ruijing,
>>
>> We did the below things to read Kafka in batch from spark:
>>
>> 1) Maintain the start offset (could be db, file etc)
>> 2) Get the end offset dynamically when the job executes.
>> 3) Pass the start and end offsets
>> 4) Overwrite the start offset with the end offset. (Should be done post
>> processing the data)
>>
>> Currently to make it work in batch mode, you need to maintain the state
>> information of the offsets externally.
>>
>>
>> Thanks
>> Anil
>>
>> -Sent from my mobile
>> http://anilkulkarni.com/
>>
>> On Mon, Feb 3, 2020, 12:39 AM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> My use case is to read from single kafka topic using a batch spark sql
>>> job (not structured streaming ideally). I want this batch job every time it
>>> starts to get the last offset it stopped at, and start reading from there
>>> until it caught up to the latest offset, store the result and stop the job.
>>> Given the dataframe has a partition and offset column, my first thought for
>>> offset management is to groupBy partition and agg the max offset, then
>>> store it in HDFS. Next time the job runs, it will read and start from this
>>> max offset using startingOffsets
>>>
>>> However, I was wondering if this will work. If the kafka producer failed
>>> an offset and later decides to resend it, I will have skipped it since I’m
>>> starting from the max offset sent. How does spark structured streaming know
>>> to continue onwards - does it keep a state of all offsets seen? If so, how
>>> can I replicate this for batch without missing data? Any help would be
>>> appreciated.
>>>
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
Cheers,
Ruijing Li


Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Ruijing Li
Hi Chris,

Thanks for the answer. So if I understand correctly:

- there will be need to dedupe since I should be expecting at least once
delivery.

- storing the result of (group by partition and and aggregate max offsets)
is enough since kafka message is immutable, so a message will get sent with
a different offset instead of the same offset.

So spark when reading from kafka is acting as a least once consumer? Why
does spark not do checkpointing for batch read of kafka?

On Mon, Feb 3, 2020 at 1:36 AM Chris Teoh  wrote:

> Kafka can keep track of the offsets (in a separate topic based on your
> consumer group) you've seen but it is usually best effort and you're
> probably better off also keeping track of your offsets.
>
> If the producer resends a message you would have to dedupe it as you've
> most likely already seen it, how you handle that is dependent on your data.
> I think the offset will increment automatically, you will generally not see
> the same offset occur more than once in a Kafka topic partition, feel free
> to correct me on this though. So the most likely scenario you need to
> handle is if the producer sends a duplicate message with two offsets.
>
> The alternative is you can reprocess the offsets back from where you
> thought the message was last seen.
>
> Kind regards
> Chris
>
> On Mon, 3 Feb 2020, 7:39 pm Ruijing Li,  wrote:
>
>> Hi all,
>>
>> My use case is to read from single kafka topic using a batch spark sql
>> job (not structured streaming ideally). I want this batch job every time it
>> starts to get the last offset it stopped at, and start reading from there
>> until it caught up to the latest offset, store the result and stop the job.
>> Given the dataframe has a partition and offset column, my first thought for
>> offset management is to groupBy partition and agg the max offset, then
>> store it in HDFS. Next time the job runs, it will read and start from this
>> max offset using startingOffsets
>>
>> However, I was wondering if this will work. If the kafka producer failed
>> an offset and later decides to resend it, I will have skipped it since I’m
>> starting from the max offset sent. How does spark structured streaming know
>> to continue onwards - does it keep a state of all offsets seen? If so, how
>> can I replicate this for batch without missing data? Any help would be
>> appreciated.
>>
>>
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Best way to read batch from Kafka and Offsets

2020-02-03 Thread Ruijing Li
Hi all,

My use case is to read from single kafka topic using a batch spark sql job
(not structured streaming ideally). I want this batch job every time it
starts to get the last offset it stopped at, and start reading from there
until it caught up to the latest offset, store the result and stop the job.
Given the dataframe has a partition and offset column, my first thought for
offset management is to groupBy partition and agg the max offset, then
store it in HDFS. Next time the job runs, it will read and start from this
max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an
offset and later decides to resend it, I will have skipped it since I’m
starting from the max offset sent. How does spark structured streaming know
to continue onwards - does it keep a state of all offsets seen? If so, how
can I replicate this for batch without missing data? Any help would be
appreciated.


-- 
Cheers,
Ruijing Li


Re: Out of memory HDFS Read and Write

2019-12-22 Thread Ruijing Li
@Chris destPaths is just a Seq[String] that holds the paths we wish to copy
the output to. Even if the collection only holds one path, it does not
work. However, the job runs fine if we don’t copy the output. The pipeline
succeeds in read input -> perform logic as dataframe -> write output. As
for your second question, I’m not sure how spark handles it, do the
executors come back to the driver to read or they have their own copy? I
don’t see any driver issues, but I will try experimenting on making that
Seq into a Dataset[String] instead if it helps.

@Sumedh That issue seems interesting to me. I need to dive into it further.
>From a quick glance, that issue describes large parquet files, but our data
is rather small. Additionally, as described above, our pipeline can run
fine with given resources if it read input -> perform logic as dataframe ->
write output, but fails on additional reads It seems the jira
describes our job should fail or see issues at the start. Lastly, I found
increasing off-heap helped more than increasing heap size for executor
(executor.memoryOverhead vs executor.memory) but we use spark 2.3.

On Sun, Dec 22, 2019 at 7:44 AM Sumedh Wale  wrote:

> Parquet reads in Spark need lots of tempory heap memory due to
> ColumnVectors and write block size. See a similar issue:
> https://jira.snappydata.io/browse/SNAP-3111
>
> In addition writes too consume significant amount of heap due to
> parquet.block.size. One solution is to reduce the spark.executor.cores in
> such a job (note the approx heap calculation noted in the ticket). Other
> solution is increased executor heap. Or use off-heap configuration with
> Spark 2.4 which will remove the pressure for reads but not for writes.
>
> regards
> sumedh
>
> On Sun, 22 Dec, 2019, 14:29 Ruijing Li,  wrote:
>
>> I was experimenting and found something interesting. I have executor OOM
>> even if I don’t write to remote clusters. So it is purely a dataframe read
>> and write issue
>> —
>> To recap, I have an ETL data pipeline that does some logic, repartitions
>> to reduce the amount of files written, writes the output to HDFS as parquet
>> files. After, it reads the output and writes it to other locations, doesn’t
>> matter if on the same hadoop cluster or multiple. This is a simple piece of
>> code
>> ```
>> destPaths.foreach(path =>
>> Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
>> match {
>> //log failure or success
>> }
>> ```
>> However this stage - read from sourceOutput and write to different
>> locations - is failing in Spark, despite all other stages succeeding,
>> including the heavy duty logic. And the data is not too big to handle for
>> spark.
>>
>> Only bumping memoryOverhead, and also repartitioning output to more
>> partitions, 40 precisely (when it failed, we partitioned the output to 20
>> after logic is finished but before writing to HDFS) have made the
>> read stage succeed.
>>
>> Not understanding how spark read stage can experience OOM issues.
>> Hoping to shed some light on why.
>>
>> On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:
>>
>>> I'm not entirely sure what the behaviour is when writing to remote
>>> cluster. It could be that the connections are being established for every
>>> element in your dataframe, perhaps having to use for each partition may
>>> reduce the number of connections? You may have to look at what the
>>> executors do when they reach out to the remote cluster.
>>>
>>> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>>>
>>>> I managed to make the failing stage work by increasing memoryOverhead
>>>> to something ridiculous > 50%. Our spark.executor.memory  = 12gb and I
>>>> bumped spark.mesos.executor.memoryOverhead=8G
>>>>
>>>> *Can someone explain why this solved the issue?* As I understand,
>>>> usage of memoryOverhead is for VM overhead and non heap items, which a
>>>> simple read and write should not use (albeit to different hadoop clusters,
>>>> but network should be nonissue since they are from the same machines).
>>>>
>>>> We use spark defaults for everything else.
>>>>
>>>> We are calling df.repartition(20) in our write after logic is done
>>>> (before failing stage of multiple cluster write) to prevent spark’s small
>>>> files problem. We reduce from 4000 partitions to 20.
>>>>
>>>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>>>> wrote:
>>>>
>>>>> Not for the stage that fails, all 

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Ruijing Li
I was experimenting and found something interesting. I have executor OOM
even if I don’t write to remote clusters. So it is purely a dataframe read
and write issue
—
To recap, I have an ETL data pipeline that does some logic, repartitions to
reduce the amount of files written, writes the output to HDFS as parquet
files. After, it reads the output and writes it to other locations, doesn’t
matter if on the same hadoop cluster or multiple. This is a simple piece of
code
```
destPaths.foreach(path =>
Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
match {
//log failure or success
}
```
However this stage - read from sourceOutput and write to different
locations - is failing in Spark, despite all other stages succeeding,
including the heavy duty logic. And the data is not too big to handle for
spark.

Only bumping memoryOverhead, and also repartitioning output to more
partitions, 40 precisely (when it failed, we partitioned the output to 20
after logic is finished but before writing to HDFS) have made the
read stage succeed.

Not understanding how spark read stage can experience OOM issues.
Hoping to shed some light on why.

On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:

> I'm not entirely sure what the behaviour is when writing to remote
> cluster. It could be that the connections are being established for every
> element in your dataframe, perhaps having to use for each partition may
> reduce the number of connections? You may have to look at what the
> executors do when they reach out to the remote cluster.
>
> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>
>> I managed to make the failing stage work by increasing memoryOverhead to
>> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
>> spark.mesos.executor.memoryOverhead=8G
>>
>> *Can someone explain why this solved the issue?* As I understand, usage
>> of memoryOverhead is for VM overhead and non heap items, which a simple
>> read and write should not use (albeit to different hadoop clusters, but
>> network should be nonissue since they are from the same machines).
>>
>> We use spark defaults for everything else.
>>
>> We are calling df.repartition(20) in our write after logic is done
>> (before failing stage of multiple cluster write) to prevent spark’s small
>> files problem. We reduce from 4000 partitions to 20.
>>
>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>> wrote:
>>
>>> Not for the stage that fails, all it does is read and write - the number
>>> of tasks is # of cores * # of executor instances. For us that is 60 (3
>>> cores 20 executors)
>>>
>>> The input partition size for the failing stage, when spark reads the 20
>>> files each 132M, it comes out to be 40 partitions.
>>>
>>>
>>>
>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>>>
>>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>>> if the number of your input partitions to the write is larger than that
>>>> configuration.
>>>>
>>>> Is there anything in the executor logs or the Spark UI DAG that
>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>>> What's the input partition size?
>>>>
>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, 
>>>> wrote:
>>>>
>>>>> Could you explain why shuffle partitions might be a good starting
>>>>> point?
>>>>>
>>>>> Some more details: when I write the output the first time after logic
>>>>> is complete, I repartition the files to 20 after having
>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>>> partitions and tries to output that to the different cluster. 
>>>>> Unfortunately
>>>>> during that read and write stage executors drop off.
>>>>>
>>>>> We keep hdfs block 128Mb
>>>>>
>>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh 
>>>>> wrote:
>>>>>
>>>>>> spark.sql.shuffle.partitions might be a start.
>>>>>>
>>>>>> Is there a difference in the number of partitions when the parquet is
>>>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>>>> spark.sql.shuffle.partitions?
>>>>>>
>>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, 
>>>>>> wrote:
>

Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
I managed to make the failing stage work by increasing memoryOverhead to
something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
spark.mesos.executor.memoryOverhead=8G

*Can someone explain why this solved the issue?* As I understand, usage of
memoryOverhead is for VM overhead and non heap items, which a simple read
and write should not use (albeit to different hadoop clusters, but network
should be nonissue since they are from the same machines).

We use spark defaults for everything else.

We are calling df.repartition(20) in our write after logic is done (before
failing stage of multiple cluster write) to prevent spark’s small files
problem. We reduce from 4000 partitions to 20.

On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:

> Not for the stage that fails, all it does is read and write - the number
> of tasks is # of cores * # of executor instances. For us that is 60 (3
> cores 20 executors)
>
> The input partition size for the failing stage, when spark reads the 20
> files each 132M, it comes out to be 40 partitions.
>
>
>
> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>
>> If you're using Spark SQL, that configuration setting causes a shuffle if
>> the number of your input partitions to the write is larger than that
>> configuration.
>>
>> Is there anything in the executor logs or the Spark UI DAG that indicates
>> a shuffle? I don't expect a shuffle if it is a straight write. What's the
>> input partition size?
>>
>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>
>>> Could you explain why shuffle partitions might be a good starting point?
>>>
>>> Some more details: when I write the output the first time after logic is
>>> complete, I repartition the files to 20 after having
>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>> Data is small about 130MB per file. When spark reads it reads in 40
>>> partitions and tries to output that to the different cluster. Unfortunately
>>> during that read and write stage executors drop off.
>>>
>>> We keep hdfs block 128Mb
>>>
>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>>
>>>> spark.sql.shuffle.partitions might be a start.
>>>>
>>>> Is there a difference in the number of partitions when the parquet is
>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>> spark.sql.shuffle.partitions?
>>>>
>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have encountered a strange executor OOM error. I have a data
>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output to
>>>>> one HDFS location as parquet then reads the files back in and writes to
>>>>> multiple hadoop clusters (all co-located in the same datacenter).  It
>>>>> should be a very simple task, but executors are being killed off exceeding
>>>>> container thresholds. From logs, it is exceeding given memory (using Mesos
>>>>> as the cluster manager).
>>>>>
>>>>> The ETL process works perfectly fine with the given resources, doing
>>>>> joins and adding columns. The output is written successfully the first
>>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>>> and writes it to different HDFS cluster paths does it fail.* (It does
>>>>> a spark.read.parquet(source).write.parquet(dest))
>>>>>
>>>>> This doesn't really make sense and I'm wondering what configurations I
>>>>> should start looking at.
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>> --
>>>>> Cheers,
>>>>> Ruijing Li
>>>>>
>>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>
-- 
Cheers,
Ruijing Li
-- 
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Ruijing Li
Not for the stage that fails, all it does is read and write - the number of
tasks is # of cores * # of executor instances. For us that is 60 (3 cores
20 executors)

The input partition size for the failing stage, when spark reads the 20
files each 132M, it comes out to be 40 partitions.



On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:

> If you're using Spark SQL, that configuration setting causes a shuffle if
> the number of your input partitions to the write is larger than that
> configuration.
>
> Is there anything in the executor logs or the Spark UI DAG that indicates
> a shuffle? I don't expect a shuffle if it is a straight write. What's the
> input partition size?
>
> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>
>> Could you explain why shuffle partitions might be a good starting point?
>>
>> Some more details: when I write the output the first time after logic is
>> complete, I repartition the files to 20 after having
>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>> Data is small about 130MB per file. When spark reads it reads in 40
>> partitions and tries to output that to the different cluster. Unfortunately
>> during that read and write stage executors drop off.
>>
>> We keep hdfs block 128Mb
>>
>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>>
>>> spark.sql.shuffle.partitions might be a start.
>>>
>>> Is there a difference in the number of partitions when the parquet is
>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>> spark.sql.shuffle.partitions?
>>>
>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have encountered a strange executor OOM error. I have a data pipeline
>>>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>>>> location as parquet then reads the files back in and writes to multiple
>>>> hadoop clusters (all co-located in the same datacenter).  It should be a
>>>> very simple task, but executors are being killed off exceeding container
>>>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>>>> cluster manager).
>>>>
>>>> The ETL process works perfectly fine with the given resources, doing
>>>> joins and adding columns. The output is written successfully the first
>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>> and writes it to different HDFS cluster paths does it fail.* (It does
>>>> a spark.read.parquet(source).write.parquet(dest))
>>>>
>>>> This doesn't really make sense and I'm wondering what configurations I
>>>> should start looking at.
>>>>
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Ruijing Li
Could you explain why shuffle partitions might be a good starting point?

Some more details: when I write the output the first time after logic is
complete, I repartition the files to 20 after having
spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
Data is small about 130MB per file. When spark reads it reads in 40
partitions and tries to output that to the different cluster. Unfortunately
during that read and write stage executors drop off.

We keep hdfs block 128Mb

On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:

> spark.sql.shuffle.partitions might be a start.
>
> Is there a difference in the number of partitions when the parquet is read
> to spark.sql.shuffle.partitions? Is it much higher than
> spark.sql.shuffle.partitions?
>
> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>
>> Hi all,
>>
>> I have encountered a strange executor OOM error. I have a data pipeline
>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>> location as parquet then reads the files back in and writes to multiple
>> hadoop clusters (all co-located in the same datacenter).  It should be a
>> very simple task, but executors are being killed off exceeding container
>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>> cluster manager).
>>
>> The ETL process works perfectly fine with the given resources, doing
>> joins and adding columns. The output is written successfully the first
>> time. *Only when the pipeline at the end reads the output from HDFS and
>> writes it to different HDFS cluster paths does it fail.* (It does a
>> spark.read.parquet(source).write.parquet(dest))
>>
>> This doesn't really make sense and I'm wondering what configurations I
>> should start looking at.
>>
>> --
>> Cheers,
>> Ruijing Li
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Ruijing Li
Hi all,

I have encountered a strange executor OOM error. I have a data pipeline
using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
location as parquet then reads the files back in and writes to multiple
hadoop clusters (all co-located in the same datacenter).  It should be a
very simple task, but executors are being killed off exceeding container
thresholds. From logs, it is exceeding given memory (using Mesos as the
cluster manager).

The ETL process works perfectly fine with the given resources, doing joins
and adding columns. The output is written successfully the first time. *Only
when the pipeline at the end reads the output from HDFS and writes it to
different HDFS cluster paths does it fail.* (It does a
spark.read.parquet(source).write.parquet(dest))

This doesn't really make sense and I'm wondering what configurations I
should start looking at.

-- 
Cheers,
Ruijing Li
-- 
Cheers,
Ruijing Li