Spark SQL question

2023-01-27 Thread Kohki Nishio
this SQL works

select 1 as *`data.group`* from tbl group by *data.group*


Since there's no such field as *data,* I thought the SQL has to look like
this

select 1 as *`data.group`* from tbl group by `*data.group`*


 But that gives and error (cannot resolve '`data.group`') ... I'm no expert
in SQL, but feel like it's a strange behavior... does anybody have a good
explanation for it ?

Thanks

-- 
Kohki Nishio


Re: GC issue - Ext Root Scanning

2021-11-16 Thread Kohki Nishio
Since it's a stop the world activity, it's supposed to finish as soon as
possible. it's parallel marking phase, so 23 threads (which is picked by
JVM) should be fine. I don't have any other apps running on this machine.
Spark is working perfectly fine except for one job, The eden size (8G) is
also reasonable given that this host has 160G, it's actually much lower
than normal time.  While running that job, JVM shrinks the size of eden,
that's why it's using relatively smaller eden size.

But the unique thing about this is that it's taking quite a long time
processing in Ext Root Scanning, which has something to do with native
space. And from the thread dumps, it's running generated code (codegen from
query) while showing this symptom. I'll continue my investigation ... Thanks

-Kohki


On Mon, Nov 15, 2021 at 8:27 AM Sean Owen  wrote:

> I'm not a GC tuning expert, but 23 GC threads sounds like a lot (?). Turn
> that down maybe.
> What are your actual heap region sizes, from -verbose:gc?
> This kind of sounds like the young generation is too ... large I think?
> not sure.
>
> None of this is particularly Spark related, but maybe some ideas will
> surface.
> Of course, reducing memory allocation in your app if possible always helps.
>
>
> On Mon, Nov 15, 2021 at 10:18 AM Kohki Nishio  wrote:
>
>> it's a VM, but it has 16 cores and 32 processors.
>>
>> -Kohki
>>
>> On Mon, Nov 15, 2021 at 12:53 AM Deepak Goel  wrote:
>>
>>> How many 'hardware threads' do you have?
>>>
>>>
>>> Deepak
>>> "The greatness of a nation can be judged by the way its animals are
>>> treated - Mahatma Gandhi"
>>>
>>> +91 73500 12833
>>> deic...@gmail.com
>>>
>>> Facebook: https://www.facebook.com/deicool
>>> LinkedIn: www.linkedin.com/in/deicool
>>>
>>> "Plant a Tree, Go Green"
>>>
>>> Make In India : http://www.makeinindia.com/home
>>>
>>>
>>> On Mon, Nov 15, 2021 at 11:02 AM Kohki Nishio 
>>> wrote:
>>>
>>>> Hello, I'm seeing continuous long pauses (>5s) while running a job in
>>>> Spark (3.1.2). I'm using G1GC but every GC(young) takes around 5 seconds
>>>> and it happens continuously, anybody has seen anything like that ? Thanks
>>>>
>>>> 
>>>> 2021-11-14T05:03:11.084+: 372441.741: [GC pause (G1 Evacuation
>>>> Pause) (young), 5.7183644 secs]
>>>>[Parallel Time: 5692.3 ms, GC Workers: 23]
>>>>   [GC Worker Start (ms): Min: 372441741.8, Avg: 372441742.2, Max:
>>>> 372441742.5, Diff: 0.7]
>>>>   [Ext Root Scanning (ms): Min: 4109.6, Avg: 4854.2, Max: 5691.1,
>>>> Diff: 1581.5, Sum: 111646.0]
>>>>   [Update RS (ms): Min: 0.0, Avg: 0.6, Max: 13.4, Diff: 13.4, Sum:
>>>> 13.4]
>>>>  [Processed Buffers: Min: 0, Avg: 9.9, Max: 227, Diff: 227,
>>>> Sum: 227]
>>>>   [Scan RS (ms): Min: 0.1, Avg: 0.1, Max: 0.2, Diff: 0.1, Sum: 1.8]
>>>>   [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>>>> 0.0, Sum: 0.0]
>>>>   [Object Copy (ms): Min: 0.0, Avg: 2.4, Max: 39.7, Diff: 39.6,
>>>> Sum: 54.1]
>>>>   [Termination (ms): Min: 0.0, Avg: 833.8, Max: 1528.3, Diff:
>>>> 1528.3, Sum: 19176.7]
>>>>  [Termination Attempts: Min: 1, Avg: 7.1, Max: 14, Diff: 13,
>>>> Sum: 164]
>>>>   [GC Worker Other (ms): Min: 0.0, Avg: 0.1, Max: 0.4, Diff: 0.4,
>>>> Sum: 2.9]
>>>>   [GC Worker Total (ms): Min: 5690.7, Avg: 5691.1, Max: 5691.4,
>>>> Diff: 0.7, Sum: 130894.9]
>>>>   [GC Worker End (ms): Min: 372447433.1, Avg: 372447433.3, Max:
>>>> 372447433.5, Diff: 0.4]
>>>>[Code Root Fixup: 0.1 ms]
>>>>    [Code Root Purge: 0.0 ms]
>>>>[String Dedup Fixup: 1.5 ms, GC Workers: 23]
>>>>   [Queue Fixup (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
>>>> 0.0]
>>>>   [Table Fixup (ms): Min: 0.1, Avg: 0.5, Max: 0.7, Diff: 0.6, Sum:
>>>> 10.5]
>>>>[Clear CT: 1.4 ms]
>>>>[Other: 23.0 ms]
>>>>   [Choose CSet: 0.0 ms]
>>>>   [Ref Proc: 16.9 ms]
>>>>   [Ref Enq: 1.6 ms]
>>>>   [Redirty Cards: 1.1 ms]
>>>>   [Humongous Register: 0.7 ms]
>>>>   [Humongous Reclaim: 0.3 ms]
>>>>   [Free CSet: 0.7 ms]
>>>>[Eden: 8096.0M(8096.0M)->0.0B(8096.0M) Survivors: 96.0M->96.0M Heap:
>>>> 23.3G(160.0G)->15.4G(160.0G)]
>>>>  [Times: user=23.46 sys=1.03, real=5.72 secs]
>>>>
>>>>
>>>> --
>>>> Kohki Nishio
>>>>
>>>
>>
>> --
>> Kohki Nishio
>>
>

-- 
Kohki Nishio


Re: GC issue - Ext Root Scanning

2021-11-15 Thread Kohki Nishio
it's a VM, but it has 16 cores and 32 processors.

-Kohki

On Mon, Nov 15, 2021 at 12:53 AM Deepak Goel  wrote:

> How many 'hardware threads' do you have?
>
>
> Deepak
> "The greatness of a nation can be judged by the way its animals are
> treated - Mahatma Gandhi"
>
> +91 73500 12833
> deic...@gmail.com
>
> Facebook: https://www.facebook.com/deicool
> LinkedIn: www.linkedin.com/in/deicool
>
> "Plant a Tree, Go Green"
>
> Make In India : http://www.makeinindia.com/home
>
>
> On Mon, Nov 15, 2021 at 11:02 AM Kohki Nishio  wrote:
>
>> Hello, I'm seeing continuous long pauses (>5s) while running a job in
>> Spark (3.1.2). I'm using G1GC but every GC(young) takes around 5 seconds
>> and it happens continuously, anybody has seen anything like that ? Thanks
>>
>> 
>> 2021-11-14T05:03:11.084+: 372441.741: [GC pause (G1 Evacuation Pause)
>> (young), 5.7183644 secs]
>>[Parallel Time: 5692.3 ms, GC Workers: 23]
>>   [GC Worker Start (ms): Min: 372441741.8, Avg: 372441742.2, Max:
>> 372441742.5, Diff: 0.7]
>>   [Ext Root Scanning (ms): Min: 4109.6, Avg: 4854.2, Max: 5691.1,
>> Diff: 1581.5, Sum: 111646.0]
>>   [Update RS (ms): Min: 0.0, Avg: 0.6, Max: 13.4, Diff: 13.4, Sum:
>> 13.4]
>>  [Processed Buffers: Min: 0, Avg: 9.9, Max: 227, Diff: 227, Sum:
>> 227]
>>   [Scan RS (ms): Min: 0.1, Avg: 0.1, Max: 0.2, Diff: 0.1, Sum: 1.8]
>>   [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
>> Sum: 0.0]
>>   [Object Copy (ms): Min: 0.0, Avg: 2.4, Max: 39.7, Diff: 39.6, Sum:
>> 54.1]
>>   [Termination (ms): Min: 0.0, Avg: 833.8, Max: 1528.3, Diff: 1528.3,
>> Sum: 19176.7]
>>  [Termination Attempts: Min: 1, Avg: 7.1, Max: 14, Diff: 13, Sum:
>> 164]
>>   [GC Worker Other (ms): Min: 0.0, Avg: 0.1, Max: 0.4, Diff: 0.4,
>> Sum: 2.9]
>>   [GC Worker Total (ms): Min: 5690.7, Avg: 5691.1, Max: 5691.4, Diff:
>> 0.7, Sum: 130894.9]
>>   [GC Worker End (ms): Min: 372447433.1, Avg: 372447433.3, Max:
>> 372447433.5, Diff: 0.4]
>>[Code Root Fixup: 0.1 ms]
>>[Code Root Purge: 0.0 ms]
>>[String Dedup Fixup: 1.5 ms, GC Workers: 23]
>>   [Queue Fixup (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
>> 0.0]
>>   [Table Fixup (ms): Min: 0.1, Avg: 0.5, Max: 0.7, Diff: 0.6, Sum:
>> 10.5]
>>[Clear CT: 1.4 ms]
>>[Other: 23.0 ms]
>>   [Choose CSet: 0.0 ms]
>>   [Ref Proc: 16.9 ms]
>>   [Ref Enq: 1.6 ms]
>>   [Redirty Cards: 1.1 ms]
>>   [Humongous Register: 0.7 ms]
>>   [Humongous Reclaim: 0.3 ms]
>>   [Free CSet: 0.7 ms]
>>[Eden: 8096.0M(8096.0M)->0.0B(8096.0M) Survivors: 96.0M->96.0M Heap:
>> 23.3G(160.0G)->15.4G(160.0G)]
>>  [Times: user=23.46 sys=1.03, real=5.72 secs]
>>
>>
>> --
>> Kohki Nishio
>>
>

-- 
Kohki Nishio


GC issue - Ext Root Scanning

2021-11-14 Thread Kohki Nishio
Hello, I'm seeing continuous long pauses (>5s) while running a job in Spark
(3.1.2). I'm using G1GC but every GC(young) takes around 5 seconds and it
happens continuously, anybody has seen anything like that ? Thanks


2021-11-14T05:03:11.084+: 372441.741: [GC pause (G1 Evacuation Pause)
(young), 5.7183644 secs]
   [Parallel Time: 5692.3 ms, GC Workers: 23]
  [GC Worker Start (ms): Min: 372441741.8, Avg: 372441742.2, Max:
372441742.5, Diff: 0.7]
  [Ext Root Scanning (ms): Min: 4109.6, Avg: 4854.2, Max: 5691.1, Diff:
1581.5, Sum: 111646.0]
  [Update RS (ms): Min: 0.0, Avg: 0.6, Max: 13.4, Diff: 13.4, Sum: 13.4]
 [Processed Buffers: Min: 0, Avg: 9.9, Max: 227, Diff: 227, Sum:
227]
  [Scan RS (ms): Min: 0.1, Avg: 0.1, Max: 0.2, Diff: 0.1, Sum: 1.8]
  [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
Sum: 0.0]
  [Object Copy (ms): Min: 0.0, Avg: 2.4, Max: 39.7, Diff: 39.6, Sum:
54.1]
  [Termination (ms): Min: 0.0, Avg: 833.8, Max: 1528.3, Diff: 1528.3,
Sum: 19176.7]
 [Termination Attempts: Min: 1, Avg: 7.1, Max: 14, Diff: 13, Sum:
164]
  [GC Worker Other (ms): Min: 0.0, Avg: 0.1, Max: 0.4, Diff: 0.4, Sum:
2.9]
  [GC Worker Total (ms): Min: 5690.7, Avg: 5691.1, Max: 5691.4, Diff:
0.7, Sum: 130894.9]
  [GC Worker End (ms): Min: 372447433.1, Avg: 372447433.3, Max:
372447433.5, Diff: 0.4]
   [Code Root Fixup: 0.1 ms]
   [Code Root Purge: 0.0 ms]
   [String Dedup Fixup: 1.5 ms, GC Workers: 23]
  [Queue Fixup (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0]
  [Table Fixup (ms): Min: 0.1, Avg: 0.5, Max: 0.7, Diff: 0.6, Sum: 10.5]
   [Clear CT: 1.4 ms]
   [Other: 23.0 ms]
  [Choose CSet: 0.0 ms]
  [Ref Proc: 16.9 ms]
  [Ref Enq: 1.6 ms]
  [Redirty Cards: 1.1 ms]
  [Humongous Register: 0.7 ms]
  [Humongous Reclaim: 0.3 ms]
  [Free CSet: 0.7 ms]
   [Eden: 8096.0M(8096.0M)->0.0B(8096.0M) Survivors: 96.0M->96.0M Heap:
23.3G(160.0G)->15.4G(160.0G)]
 [Times: user=23.46 sys=1.03, real=5.72 secs]


-- 
Kohki Nishio


Re: Possibly a memory leak issue in Spark

2021-09-22 Thread Kohki Nishio
I'm seeing millions of Stage / Job / Task data

jcmd GC.class_histogram shows
--
 6:   7835346 2444627952  org.apache.spark.status.TaskDataWrapper
25:   3765152  180727296  org.apache.spark.status.StageDataWrapper
88:2322559290200  org.apache.spark.status.JobDataWrapper

2nd column is the number of instances

-Kokhi

On Wed, Sep 22, 2021 at 11:06 AM Russell Spitzer 
wrote:

> As Sean said I believe you want to be setting
>
> spark.ui.retainedJobs 1000 How many jobs the Spark UI and status APIs
> remember before garbage collecting. This is a target maximum, and fewer
> elements may be retained in some circumstances. 1.2.0
> spark.ui.retainedStages 1000 How many stages the Spark UI and status APIs
> remember before garbage collecting. This is a target maximum, and fewer
> elements may be retained in some circumstances. 0.9.0
> spark.ui.retainedTasks 10 How many tasks in one stage the Spark UI
> and status APIs remember before garbage collecting. This is a target
> maximum, and fewer elements may be retained in some circumstances. 2.0.1
>
> To lower numbers. If i remember correctly this is what controls how much
> metadata remains in the driver post task/stage/job competition.
>
> On Sep 22, 2021, at 12:42 PM, Kohki Nishio  wrote:
>
> I believe I have enough information, raised this
>
> https://issues.apache.org/jira/browse/SPARK-36827
>
> thanks
> -Kohki
>
>
> On Tue, Sep 21, 2021 at 9:30 PM Sean Owen  wrote:
>
>> No, that's just info Spark retains about finished jobs and tasks, likely.
>> You can limit how much is retained if desired with config.
>>
>> On Tue, Sep 21, 2021, 11:29 PM Kohki Nishio  wrote:
>>
>>> Just following up, it looks like task / stage / job data are not cleaned
>>> up
>>> --
>>>6:   7835346 2444627952
>>>  org.apache.spark.status.TaskDataWrapper
>>>  25:   3765152  180727296
>>>  org.apache.spark.status.StageDataWrapper
>>> 88:    232255    9290200  org.apache.spark.status.JobDataWrapper
>>>
>>> UI is disabled, not sure why we need to have those data ..
>>>
>>> -Kohki
>>>
>>>
>>> On Fri, Sep 17, 2021 at 8:27 AM Kohki Nishio  wrote:
>>>
>>>> Hello,
>>>> I'm seeing possible memory leak behavior in my spark application.
>>>> According to MAT, it looks like it's related to ElementTrackingStore ..
>>>>
>>>> 
>>>>
>>>> The increase is subtle, so it takes multiple days to actually cause
>>>> some impact, but I'm wondering if anybody has any idea about what this is
>>>> about ...  Below is the GC graph, yellow is the level after any GC kicks 
>>>> in.
>>>>
>>>> 
>>>>
>>>> Thanks
>>>> --
>>>> Kohki Nishio
>>>>
>>>
>>>
>>> --
>>> Kohki Nishio
>>>
>>
>
> --
> Kohki Nishio
>
>
>

-- 
Kohki Nishio


Re: Lock issue with SQLConf.getConf

2021-09-11 Thread Kohki Nishio
Awesome, thanks!

On Sat, Sep 11, 2021 at 6:34 AM Sean Owen  wrote:

> Looks like this was improved in
> https://issues.apache.org/jira/browse/SPARK-35701 for 3.2.0
>
> On Fri, Sep 10, 2021 at 10:21 PM Kohki Nishio  wrote:
>
>> Hello,
>> I'm running spark in local mode and seeing multiple threads showing like
>> below, anybody knows why it's not using a concurrent hash map  ?
>>
>> ---
>> "request-handler-dispatcher-19785" #107532 prio=5 os_prio=0
>> tid=0x7fbd78036000 nid=0x4ebf runnable [0x7fc6e83af000]
>>java.lang.Thread.State: RUNNABLE
>> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
>> - locked <0x7fc901c7d9f8> (a java.util.Collections$SynchronizedMap)
>> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
>> at
>> org.apache.spark.sql.internal.SQLConf.constraintPropagationEnabled(SQLConf.scala:3346)
>>
>> "Executor task launch worker for task 23.0 in stage 279783.0 (TID
>> 449746)" #107929 daemon prio=5 os_prio=0 tid=0x7fbe2c005000 nid=0x55f6
>> waiting for monitor entry [0x7fc6e4037000]
>>java.lang.Thread.State: BLOCKED (on object monitor)
>> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
>> - waiting to lock <0x7fc901c7d9f8> (a
>> java.util.Collections$SynchronizedMap)
>> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
>> at
>> org.apache.spark.sql.internal.SQLConf.methodSplitThreshold(SQLConf.scala:3330)
>>
>> "Executor task launch worker for task 16.0 in stage 279883.0 (TID
>> 449728)" #107690 daemon prio=5 os_prio=0 tid=0x7fbc60056800 nid=0x53c6
>> waiting for monitor entry [0x7fc65a3a8000]
>>java.lang.Thread.State: BLOCKED (on object monitor)
>> at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
>> - waiting to lock <0x7fc901c7d9f8> (a
>> java.util.Collections$SynchronizedMap)
>> at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
>> at
>> org.apache.spark.sql.internal.SQLConf.planChangeLogLevel(SQLConf.scala:3160)
>> at
>> org.apache.spark.sql.catalyst.rules.PlanChangeLogger.(RuleExecutor.scala:49)
>>
>> ---
>>
>>
>> --
>> Kohki Nishio
>>
>

-- 
Kohki Nishio


Lock issue with SQLConf.getConf

2021-09-10 Thread Kohki Nishio
Hello,
I'm running spark in local mode and seeing multiple threads showing like
below, anybody knows why it's not using a concurrent hash map  ?

---
"request-handler-dispatcher-19785" #107532 prio=5 os_prio=0
tid=0x7fbd78036000 nid=0x4ebf runnable [0x7fc6e83af000]
   java.lang.Thread.State: RUNNABLE
at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
- locked <0x7fc901c7d9f8> (a java.util.Collections$SynchronizedMap)
at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
at
org.apache.spark.sql.internal.SQLConf.constraintPropagationEnabled(SQLConf.scala:3346)

"Executor task launch worker for task 23.0 in stage 279783.0 (TID 449746)"
#107929 daemon prio=5 os_prio=0 tid=0x7fbe2c005000 nid=0x55f6 waiting
for monitor entry [0x7fc6e4037000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
- waiting to lock <0x7fc901c7d9f8> (a
java.util.Collections$SynchronizedMap)
at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
at
org.apache.spark.sql.internal.SQLConf.methodSplitThreshold(SQLConf.scala:3330)

"Executor task launch worker for task 16.0 in stage 279883.0 (TID 449728)"
#107690 daemon prio=5 os_prio=0 tid=0x7fbc60056800 nid=0x53c6 waiting
for monitor entry [0x7fc65a3a8000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.Collections$SynchronizedMap.get(Collections.java:2586)
- waiting to lock <0x7fc901c7d9f8> (a
java.util.Collections$SynchronizedMap)
at org.apache.spark.sql.internal.SQLConf.getConf(SQLConf.scala:3750)
at
org.apache.spark.sql.internal.SQLConf.planChangeLogLevel(SQLConf.scala:3160)
at
org.apache.spark.sql.catalyst.rules.PlanChangeLogger.(RuleExecutor.scala:49)

---


-- 
Kohki Nishio


Re: JavaSerializerInstance is slow

2021-09-07 Thread Kohki Nishio
A spark job creates 200 partitions, and executors try to deserialize
the task at the same time. That creates a chain of blocking situations, as
all executors are deserializing the same task and loadClass does a lock per
class name. I often observe that many threads are making that chain from
the thread dumps.

We're using Spark as a high TPS search engine, we can't really afford
allocating a resource per query, thus, we're going with local mode, I
believe there are people using similar way in production, but anyways,
thanks for the comments. For now, it seems Java deserializer is the only
option, ... so it seems I'll have to add more machines to handle higher TPS,

Thanks
-Kohki

On Fri, Sep 3, 2021 at 5:40 AM Sean Owen  wrote:

> I don't know if java serialization is slow in that case; that shows
> blocking on a class load, which may or may not be directly due to
> deserialization.
> Indeed I don't think (some) things are serialized in local mode within one
> JVM, so not sure that's actually what's going on.
>
> On Thu, Sep 2, 2021 at 11:58 PM Antonin Delpeuch (lists) <
> li...@antonin.delpeuch.eu> wrote:
>
>> Hi Kohki,
>>
>> Serialization of tasks happens in local mode too and as far as I am
>> aware there is no way to disable this (although it would definitely be
>> useful in my opinion).
>>
>> You can see the local mode as a testing mode, in which you would want to
>> catch any serialization errors, before they appear in production.
>>
>> There are also some important bugs that are present in local mode and
>> are not deemed worth fixing because it is not intended to be used in
>> production (https://issues.apache.org/jira/browse/SPARK-5300).
>>
>> I think there would definitely be interest in having a reliable and
>> efficient local mode in Spark but it's a pretty different use case than
>> what Spark originally focused on.
>>
>> Antonin
>>
>> On 03/09/2021 05:56, Kohki Nishio wrote:
>> > I'm seeing many threads doing deserialization of a task, I understand
>> > since lambda is involved, we can't use Kryo for those purposes.
>> > However I'm running it in local mode, this serialization is not really
>> > necessary, no?
>> >
>> > Is there any trick I can apply to get rid of this thread contention ?
>> > I'm seeing many of the below threads in thread dumps ...
>> >
>> >
>> > "Executor task launch worker for task 11.0 in stage 15472514.0 (TID
>> > 19788863)" #732821 daemon prio=5 os_prio=0 tid=0x7f02581b2800
>> > nid=0x355d waiting for monitor entry [0x7effd1e3f000]
>> >java.lang.Thread.State: BLOCKED (on object monitor)
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:400)
>> > - waiting to lock <0x7f0f7246edf8> (a java.lang.Object)
>> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>> > at
>> >
>> scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:51)
>> > at
>> >
>> scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
>> >
>> >
>> > Thanks
>> > -Kohki
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
Kohki Nishio


JavaSerializerInstance is slow

2021-09-02 Thread Kohki Nishio
I'm seeing many threads doing deserialization of a task, I understand since
lambda is involved, we can't use Kryo for those purposes. However I'm
running it in local mode, this serialization is not really necessary, no?

Is there any trick I can apply to get rid of this thread contention ? I'm
seeing many of the below threads in thread dumps ...


"Executor task launch worker for task 11.0 in stage 15472514.0 (TID
19788863)" #732821 daemon prio=5 os_prio=0 tid=0x7f02581b2800
nid=0x355d waiting for monitor entry [0x7effd1e3f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.ClassLoader.loadClass(ClassLoader.java:400)
- waiting to lock <0x7f0f7246edf8> (a java.lang.Object)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:51)
at
scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)


Thanks
-Kohki


Re: Ordering pushdown for Spark Datasources

2021-04-05 Thread Kohki Nishio
The log data is stored in Lucene and I have a custom data source to access
it. For example, the condition is log-level = INFO, this brings in a couple
of million records per partition. Then there are hundreds of partitions
involved in a query. Spark has to go through all the entries to show the
first 100 entries, that is the problem. But if Spark is aware of
datasource's ordering  support, it only needs to fetch 100 per partition...

I'm wondering if Spark could do a merge-sort to make this type of query
faster..

Thanks
-Kohki

On Mon, Apr 5, 2021 at 1:02 AM Mich Talebzadeh 
wrote:

> Hi,
>
> A couple of clarifications:
>
>
>1. How is the log data stored on say HDFS?
>2. You stated show the first 100 entries for a given condition. That
>condition is a predicate itself?
>
> There are articles for predicate pushdown in Spark. For example check
>
> Using Spark predicate push down in Spark SQL queries | DSE 6.0 Dev guide
> (datastax.com)
> <https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html#:~:text=A%20predicate%20push%20down%20filters,WHERE%20clauses%20to%20the%20database.>
>
> Although large is a relative term. So that a couple of millions is not
> that large. You can also try most of the following in spark-sql
>
> spark-sql> set adaptive.enabled = true;
> adaptive.enabledtrue
> Time taken: 0.011 seconds, Fetched 1 row(s)
> spark-sql> set optimize.ppd=true;
> optimize.ppdtrue
> Time taken: 0.011 seconds, Fetched 1 row(s)
> spark-sql> set cbo.enables= true;
> cbo.enables true
> Time taken: 0.01 seconds, Fetched 1 row(s)
> spark-sql> set adaptive.enabled = true;
> adaptive.enabledtrue
> Time taken: 0.01 seconds, Fetched 1 row(s)
>
> Spark SQL is influenced by Hive SQL so you can leverage the pushdown in
> Hive SQL.
>
> Check this link as well
>
> Spark SQL Performance Tuning by Configurations — SparkByExamples
> <https://sparkbyexamples.com/spark/spark-sql-performance-tuning-configurations/>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 23:55, Kohki Nishio  wrote:
>
>> Hello,
>>
>> I'm trying to use Spark SQL as a log analytics solution. As you might
>> guess, for most use-cases, data is ordered by timestamp and the amount of
>> data is large.
>>
>> If I want to show the first 100 entries (ordered by timestamp) for a
>> given condition, Spark Executor has to scan the whole entries to select the
>> top 100 by timestamp.
>>
>> I understand this behavior, however, some of the data sources such as
>> JDBC or Lucene can support ordering and in this case, the target data is
>> large (a couple of millions). I believe it is possible to pushdown
>> orderings to the data sources and make the executors return early.
>>
>> Here's my ask, I know Spark doesn't do such a thing... but I'm looking
>> for any pointers, references which might be relevant to this, or .. any
>> random idea would be appreciated. So far I found, some folks are working on
>> aggregation pushdown (SPARK-22390), but I don't see any current activity
>> for ordering pushdown.
>>
>> Thanks
>>
>>
>> --
>> Kohki Nishio
>>
>

-- 
Kohki Nishio


Ordering pushdown for Spark Datasources

2021-04-04 Thread Kohki Nishio
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might
guess, for most use-cases, data is ordered by timestamp and the amount of
data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given
condition, Spark Executor has to scan the whole entries to select the
top 100 by timestamp.

I understand this behavior, however, some of the data sources such as JDBC
or Lucene can support ordering and in this case, the target data is large
(a couple of millions). I believe it is possible to pushdown orderings to
the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for
any pointers, references which might be relevant to this, or .. any random
idea would be appreciated. So far I found, some folks are working on
aggregation pushdown (SPARK-22390), but I don't see any current activity
for ordering pushdown.

Thanks


-- 
Kohki Nishio


DataSourceV2 with ordering pushdown

2020-12-22 Thread Kohki Nishio
I'm trying to connect Spark with Lucene indices and noticed that I can't
really tell what ordering Spark can expect from my Batch / PartitionReader.

Spark ended up retrieving all rows then doing ordering if there's any
orderBy, is there anyway I can tell spark that this partition is ordered ?
Is working with a physical plan the only way to achieve this ?

Thanks
-- 
Kohki Nishio


Re: ClassLoader problem - java.io.InvalidClassException: scala.Option; local class incompatible

2017-02-20 Thread Kohki Nishio
Created a jira, I believe SBT is a valid use case, but it's resolved as Not
a Problem ..

https://issues.apache.org/jira/browse/SPARK-19675


On Mon, Feb 20, 2017 at 10:36 PM, Kohki Nishio  wrote:

> Hello, I'm writing a Play Framework application which does Spark, however
> I'm getting below
>
> java.io.InvalidClassException: scala.Option; local class incompatible:
> stream classdesc serialVersionUID = -114498752079829388, local class
> serialVersionUID = 5081326844987135632
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1630)
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1630)
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2018)
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1942)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1808)
>
> It's because I'm launching the application via SBT and sbt-launch.jar
> contains Scala 2.10 binary. However my Spark binary is for 2.11 that's why
> I'm getting this. I believe ExecutorClassLoader needs to override loadClass
> method as well, can anyone comment on this ? It's picking up Option class
> from system classloader.
>
> Thanks
> --
> Kohki Nishio
>



-- 
Kohki Nishio


ClassLoader problem - java.io.InvalidClassException: scala.Option; local class incompatible

2017-02-20 Thread Kohki Nishio
Hello, I'm writing a Play Framework application which does Spark, however
I'm getting below

java.io.InvalidClassException: scala.Option; local class incompatible:
stream classdesc serialVersionUID = -114498752079829388, local class
serialVersionUID = 5081326844987135632
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)

It's because I'm launching the application via SBT and sbt-launch.jar
contains Scala 2.10 binary. However my Spark binary is for 2.11 that's why
I'm getting this. I believe ExecutorClassLoader needs to override loadClass
method as well, can anyone comment on this ? It's picking up Option class
from system classloader.

Thanks
-- 
Kohki Nishio


Re: Parquet partitioning for unique identifier

2015-09-04 Thread Kohki Nishio
The stack trace is this

java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at 
parquet.column.values.rle.RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder.java:125)
at 
parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter.(RunLengthBitPackingHybridValuesWriter.java:36)
at 
parquet.column.ParquetProperties.getColumnDescriptorValuesWriter(ParquetProperties.java:61)
at parquet.column.impl.ColumnWriterImpl.(ColumnWriterImpl.java:72)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at 
parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)


It looks like this
https://issues.apache.org/jira/browse/PARQUET-222

Here's the schema I have, I don't think this is such different schema, ...
maybe use of Map is causing this. Is it trying to register all of keys of a
map as a column ?

root
 |-- intId: integer (nullable = false)
 |-- uniqueId: string (nullable = true)
 |-- date1: string (nullable = true)
 |-- date2: string (nullable = true)
 |-- date3: string (nullable = true)
 |-- type: integer (nullable = false)
 |-- cat: string (nullable = true)
 |-- subCat: string (nullable = true)
 |-- unit: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- attr: map (nullable = true)
 ||-- key: string
 ||-- value: string (valueContainsNull = true)
 |-- price: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp1: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp2: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)
 |-- imp3: map (nullable = true)
 ||-- key: string
 ||-- value: double (valueContainsNull = false)



On Thu, Sep 3, 2015 at 11:27 PM, Cheng Lian  wrote:

> Could you please provide the full stack track of the OOM exception?
> Another common case of Parquet OOM is super wide tables, say hundred or
> thousands of columns. And in this case, the number of rows is mostly
> irrelevant.
>
> Cheng
>
>
> On 9/4/15 1:24 AM, Kohki Nishio wrote:
>
> let's say I have a data like htis
>
>ID  |   Some1   |  Some2| Some3   | 
> A1 | kdsfajfsa | dsafsdafa | fdsfafa  |
> A2 | dfsfafasd | 23jfdsjkj | 980dfs   |
> A3 | 99989df   | jksdljas  | 48dsaas  |
>..
> Z00..  | fdsafdsfa | fdsdafdas | 89sdaff  |
>
> My understanding is that if I give the column 'ID' to use for partition,
> it's going to generate a file per entry since it's unique, no ? Using Json,
> I create 1000 files separated as specified in parallelize parameter. But
> json is large and a bit slow I'd like to try Parquet to see what happens.
>
> On Wed, Sep 2, 2015 at 11:15 PM, Adrien Mogenet <
> adrien.moge...@contentsquare.com> wrote:
>
>> Any code / Parquet schema to provide? I'm not sure to understand which
>> step fails right there...
>>
>> On 3 September 2015 at 04:12, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> Did you specify partitioning column while saving data..
>>> On Sep 3, 2015 5:41 AM, "Kohki Nishio" < 
>>> tarop...@gmail.com> wrote:
>>>
>>>> Hello experts,
>>>>
>>>> I have a huge json file (> 40G) and trying to use Parquet as a file
>>>> format. Each entry has a unique identifier but other than that, it doesn't
>>>> have 'well balanced value' column to partition it. Right now it just throws
>>>> OOM and couldn't figure out what to do with it.
>>>>
>>>> It would be ideal if I could provide a partitioner based on the unique
>>>> identifier value like computing its hash value or something.  One of the
>>>> option would be to produce a hash value and add it as a sepa

Re: Parquet partitioning for unique identifier

2015-09-03 Thread Kohki Nishio
let's say I have a data like htis

   ID  |   Some1   |  Some2| Some3   | 
A1 | kdsfajfsa | dsafsdafa | fdsfafa  |
A2 | dfsfafasd | 23jfdsjkj | 980dfs   |
A3 | 99989df   | jksdljas  | 48dsaas  |
   ..
Z00..  | fdsafdsfa | fdsdafdas | 89sdaff  |

My understanding is that if I give the column 'ID' to use for partition,
it's going to generate a file per entry since it's unique, no ? Using Json,
I create 1000 files separated as specified in parallelize parameter. But
json is large and a bit slow I'd like to try Parquet to see what happens.

On Wed, Sep 2, 2015 at 11:15 PM, Adrien Mogenet <
adrien.moge...@contentsquare.com> wrote:

> Any code / Parquet schema to provide? I'm not sure to understand which
> step fails right there...
>
> On 3 September 2015 at 04:12, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> Did you specify partitioning column while saving data..
>> On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:
>>
>>> Hello experts,
>>>
>>> I have a huge json file (> 40G) and trying to use Parquet as a file
>>> format. Each entry has a unique identifier but other than that, it doesn't
>>> have 'well balanced value' column to partition it. Right now it just throws
>>> OOM and couldn't figure out what to do with it.
>>>
>>> It would be ideal if I could provide a partitioner based on the unique
>>> identifier value like computing its hash value or something.  One of the
>>> option would be to produce a hash value and add it as a separate column,
>>> but it doesn't sound right to me. Is there any other ways I can try ?
>>>
>>> Regards,
>>> --
>>> Kohki Nishio
>>>
>>
>
>
> --
>
> *Adrien Mogenet*
> Head of Backend/Infrastructure
> adrien.moge...@contentsquare.com
> (+33)6.59.16.64.22
> http://www.contentsquare.com
> 50, avenue Montaigne - 75008 Paris
>



-- 
Kohki Nishio


Parquet partitioning for unique identifier

2015-09-02 Thread Kohki Nishio
Hello experts,

I have a huge json file (> 40G) and trying to use Parquet as a file format.
Each entry has a unique identifier but other than that, it doesn't have
'well balanced value' column to partition it. Right now it just throws OOM
and couldn't figure out what to do with it.

It would be ideal if I could provide a partitioner based on the unique
identifier value like computing its hash value or something.  One of the
option would be to produce a hash value and add it as a separate column,
but it doesn't sound right to me. Is there any other ways I can try ?

Regards,
-- 
Kohki Nishio


FAILED_TO_UNCOMPRESS error from Snappy

2015-08-20 Thread Kohki Nishio
Right after upgraded to 1.4.1, we started seeing this exception and yes we
picked up snappy-java-1.1.1.7 (previously snappy-java-1.1.1.6). Is there
anything I could try ? I don't have a repro case.

org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:169)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:168)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
at
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
at
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1178)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
... 33 more


-- 
Kohki Nishio