Re: Reducing planning time on tables with many indexes

2022-08-08 Thread Luc Vlaming Hummel
On 27.07.22, 18:39, "Tom Lane"  wrote:

[External Email]


David Geier  writes:
> We tracked down the root cause of this slowdown to lock contention in
> 'get_relation_info()'. The index lock of every single index of every 
single
> table used in that query is acquired. We attempted a fix by pre-filtering
> out all indexes that anyways cannot be used with a certain query, without
> taking the index locks (credits to Luc Vlaming for idea and
> implementation). The patch does so by caching the columns present in every
> index, inside 'struct Relation', similarly to 'rd_indexlist'.

I wonder how much thought you gave to the costs imposed by that extra
cache space.  We have a lot of users who moan about relcache bloat
already.  But more to the point, I do not buy the assumption that
an index's set of columns is a good filter for which indexes are of
interest.  A trivial counterexample from the regression database is

regression=# explain select count(*) from tenk1;
 QUERY PLAN




 Aggregate  (cost=219.28..219.29 rows=1 width=8)
   ->  Index Only Scan using tenk1_hundred on tenk1  (cost=0.29..194.28 
rows=100
00 width=0)
(2 rows)

It looks to me like the patch also makes unwarranted assumptions about
being able to discard all but the smallest index having a given set
of columns.  This would, for example, possibly lead to dropping the
index that has the most useful sort order, or that has the operator
class needed to support a specific WHERE clause.

Thanks for checking out the patch!

Just to make sure we're on the same page: we're only making this assumption if 
you select no fields at all.
If you select any fields at all it will check for column overlap, and if 
there's any overlap with any referenced field, 
then the index will not be filtered out.

For producing a row count with no referenced fields it is true that it should 
select the truly cheapest 
index to produce the row count and there should be some Index-am callback 
introduced for that. 
For now it was just a quick-and-dirty solution.
Wouldn't a callback that would estimate the amount of data read be good enough 
though?

For sort orders the field to sort by should be listed and hence the index 
should not be filtered out,
or what am I missing? Likely I've missed some fields that are referenced 
somehow (potentially indirectly),
but that shouldn't disqualify the approach completely.

In short, I'm not sure I buy this concept at all.  I think it might
be more useful to attack the locking overhead more directly.  I kind
of wonder why we need per-index locks at all during planning ---
I think that we already interpret AccessShareLock on the parent table
as being sufficient to block schema changes on existing indexes.

Could you elaborate as to why this approach is not good enough? To me it seems 
that avoiding work
ahead of time is generally useful. Or are you worried that we remove too much?

Unfortunately, as things stand today, the planner needs more than the
right to look at the indexes' schemas, because it makes physical accesses
to btree indexes to find out their tree height (and I think there are some
comparable behaviors in other AMs).  I've never particularly cared for
that implementation, and would be glad to rip out that behavior if we can
find another way.  Maybe we could persuade VACUUM or ANALYZE to store that
info in the index's pg_index row, or some such, and then the planner
could use it with no lock?

regards, tom lane


The thing you're touching on is specific for a btree. Not sure this generalizes 
to all index types that
are out there though? I could see there being some property that allows you to 
be "no-lock",
and then a callback that allows you to cache some generic data that can be 
transformed
when the indexopt info structs are filled. Is that roughly what you have in 
mind?

Best,
Luc



Re: Lazy JIT IR code generation to increase JIT speed with partitions

2022-07-04 Thread Luc Vlaming Hummel
Hi Alvaro, hi David,

Thanks for reviewing this and the interesting examples!

Wanted to give a bit of extra insight as to why I'd love to have a system that 
can lazily emit JIT code and hence creates roughly a module per function:
In the end I'm hoping that we can migrate to a system where we only JIT after a 
configurable cost has been exceeded for this node, as well as a configurable 
amount of rows has actually been processed.
Reason is that this would safeguard against some problematic planning issues 
wrt JIT (node not being executed, row count being massively off).
It would also allow for more finegrained control, with a cost system similar to 
most other planning costs, as they are also per node and not globally, and 
would potentially allow us to only JIT things where we expect to truly gain any 
runtime compared to the costs of doing it.

If this means we have to invest more in making it cheap(er) to emit modules, 
I'm all for that. Kudos to David for fixing the caching in that sense :) 
@Andres if there's any other things we ought to fix to make this cheap (enough) 
compared to the previous code I'd love to know your thoughts.

Best,
Luc Vlaming
(ServiceNow)


From: David Geier 
Sent: Wednesday, June 29, 2022 11:03 AM
To: Alvaro Herrera 
Cc: Luc Vlaming ; Andres Freund ; 
PostgreSQL-development 
Subject: Re: Lazy JIT IR code generation to increase JIT speed with partitions 
 
[External Email]
 
Hi Alvaro,

That's a very interesting case and might indeed be fixed or at least improved 
by this patch. I tried to reproduce this, but at least when running a simple, 
serial query with increasing numbers of functions, the time spent per function 
is linear or even slightly sub-linear (same as Tom observed in [1]).

I also couldn't reproduce the JIT runtimes you shared, when running the 
attached catalog query. The catalog query ran serially for me with the 
following JIT stats:

 JIT:
   Functions: 169
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 12.223 ms, Inlining 17.323 ms, Optimization 388.491 ms, 
Emission 283.464 ms, Total 701.501 ms

Is it possible that the query ran in parallel for you? For parallel queries, 
every worker JITs all of the functions it uses. Even though the workers might 
JIT the functions in parallel, the time reported in the EXPLAIN ANALYZE output 
is the sum of the time spent by all workers. With this patch applied, the JIT 
time drops significantly, as many of the generated functions remain unused.

 JIT:
   Modules: 15
   Functions: 26
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 1.931 ms, Inlining 0.722 ms, Optimization 67.195 ms, 
Emission 70.347 ms, Total 140.195 ms

Of course, this does not prove that the nonlinearity that you observed went 
away. Could you share with me how you ran the query so that I can reproduce 
your numbers on master to then compare them with the patched version? Also, 
which LLVM version did you run with? I'm currently running with LLVM 13.

Thanks!

--
David Geier
(ServiceNow)

On Mon, Jun 27, 2022 at 5:37 PM Alvaro Herrera  wrote:
On 2021-Jan-18, Luc Vlaming wrote:

> I would like this topic to somehow progress and was wondering what other
> benchmarks / tests would be needed to have some progress? I've so far
> provided benchmarks for small(ish) queries and some tpch numbers, assuming
> those would be enough.

Hi, some time ago I reported a case[1] where our JIT implementation does
a very poor job and perhaps the changes that you're making could explain
what is going on, and maybe even fix it:

[1] https://postgr.es/m/20241706.wqq7xoyigwa2@alvherre.pgsql

The query for which I investigated the problem involved some pg_logical
metadata tables, so I didn't post it anywhere public; but the blog post
I found later contains a link to a query that shows the same symptoms,
and which is luckily still available online:
https://gist.github.com/saicitus/251ba20b211e9e73285af35e61b19580
I attach it here in case it goes missing sometime.

-- 
Álvaro Herrera               48°01'N 7°57'E  —  https://www.EnterpriseDB.com/



Re: "could not find pathkey item to sort" for TPC-DS queries 94-96

2021-04-15 Thread Luc Vlaming

On 15-04-2021 04:01, James Coleman wrote:

On Wed, Apr 14, 2021 at 5:42 PM James Coleman  wrote:


On Mon, Apr 12, 2021 at 8:37 AM Tomas Vondra
 wrote:


On 4/12/21 2:24 PM, Luc Vlaming wrote:

Hi,

When trying to run on master (but afaik also PG-13) TPC-DS queries 94,
95 and 96 on a SF10 I get the error "could not find pathkey item to sort".
When I disable enable_gathermerge the problem goes away and then the
plan for query 94 looks like below. I tried figuring out what the
problem is but to be honest I would need some pointers as the code that
tries to matching equivalence members in prepare_sort_from_pathkeys is
something i'm really not familiar with.



Could be related to incremental sort, which allowed some gather merge
paths that were impossible before. We had a couple issues related to
that fixed in November, IIRC.


To reproduce you can either ingest and test using the toolkit I used too
(see https://github.com/swarm64/s64da-benchmark-toolkit/), or
alternatively just use the schema (see
https://github.com/swarm64/s64da-benchmark-toolkit/tree/master/benchmarks/tpcds/schemas/psql_native)



Thanks, I'll see if I can reproduce that with your schema.


regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


The query in question is:

select  count(*)
 from store_sales
 ,household_demographics
 ,time_dim, store
 where ss_sold_time_sk = time_dim.t_time_sk
 and ss_hdemo_sk = household_demographics.hd_demo_sk
 and ss_store_sk = s_store_sk
 and time_dim.t_hour = 15
 and time_dim.t_minute >= 30
 and household_demographics.hd_dep_count = 7
 and store.s_store_name = 'ese'
 order by count(*)
 limit 100;

 From debugging output it looks like this is the plan being chosen
(cheapest total path):
 Gather(store_sales household_demographics time_dim) rows=60626
cost=3145.73..699910.15
 HashJoin(store_sales household_demographics time_dim)
rows=25261 cost=2145.73..692847.55
   clauses: store_sales.ss_hdemo_sk =
household_demographics.hd_demo_sk
 HashJoin(store_sales time_dim) rows=252609
cost=1989.73..692028.08
   clauses: store_sales.ss_sold_time_sk =
time_dim.t_time_sk
 SeqScan(store_sales) rows=11998564
cost=0.00..658540.64
 SeqScan(time_dim) rows=1070
cost=0.00..1976.35
 SeqScan(household_demographics) rows=720
cost=0.00..147.00

prepare_sort_from_pathkeys fails to find a pathkey because
tlist_member_ignore_relabel returns null -- which seemed weird because
the sortexpr is an Aggref (in a single member equivalence class) and
the tlist contains a single member that's also an Aggref. It turns out
that the only difference between the two Aggrefs is that the tlist
entry has "aggsplit = AGGSPLIT_INITIAL_SERIAL" while the sortexpr has
aggsplit = AGGSPLIT_SIMPLE.

That's as far as I've gotten so far, but I figured I'd get that info
out to see if it means anything obvious to anyone else.


This really goes back to [1] where we fixed a similar issue by making
find_em_expr_usable_for_sorting_rel parallel the rules in
prepare_sort_from_pathkeys.

Most of those conditions got copied, and the case we were trying to
handle is the fact that prepare_sort_from_pathkeys can generate a
target list entry under those conditions if one doesn't exist. However
there's a further restriction there I don't remember looking at: it
uses pull_var_clause and tlist_member_ignore_relabel to ensure that
all of the vars that feed into the sort expression are found in the
target list. As I understand it, that is: it will build a target list
entry for something like "md5(column)" if "column" (and that was one
of our test cases for the previous fix) is in the target list already.

But there's an additional detail here: the call to pull_var_clause
requests aggregates, window functions, and placeholders be treated as
vars. That means for our Aggref case it would require that the two
Aggrefs be fully equal, so the differing aggsplit member would cause a
target list entry not to be built, hence our error here.

I've attached a quick and dirty patch that encodes that final rule
from prepare_sort_from_pathkeys into
find_em_expr_usable_for_sorting_rel. I can't help but think that
there's a cleaner way to do with this with less code duplication, but
hindering that is that prepare_sort_from_pathkeys is working with a
TargetList while find_em_expr_usable_for_sorting_rel is working with a
list of expressions.

James

1: 
https://www.postgresql.org/message-id/CAAaqYe9C3f6A_tZCRfr9Dm7hPpgGwpp4i-K_%3DNS9GWXuNiFANg%40mail.gmail.com



Hi,

The patch seems to make the planner proceed and not error out anymore. 
Cannot judge if it's doing the right thing however or if its 

potential deadlock in parallel hashjoin grow-buckets-barrier and blocking nodes?

2021-04-13 Thread Luc Vlaming

Hi,

Whilst trying to debug a deadlock in some tpc-ds query I noticed 
something that could cause problems in the hashjoin implementation and 
cause potentially deadlocks (if my analysis is right).


Whilst building the inner hash table, the whole time the grow barriers 
are attached (the PHJ_BUILD_HASHING_INNER phase).
Usually this is not a problem, however if one of the nodes blocks 
somewhere further down in the plan whilst trying to fill the inner hash 
table whilst the others are trying to e.g. extend the number of buckets 
using ExecParallelHashIncreaseNumBuckets, they would all wait until the 
blocked process comes back to the hashjoin node and also joins the effort.
Wouldn't this give potential deadlock situations? Or why would a worker 
that is hashing the inner be required to come back and join the effort 
in growing the hashbuckets?


With very skewed workloads (one node providing all data) I was at least 
able to have e.g. 3 out of 4 workers waiting in 
ExecParallelHashIncreaseNumBuckets, whilst one was in the 
execprocnode(outernode). I tried to detatch and reattach the barrier but 
this proved to be a bad idea :)


Regards,
Luc




interaction between csps with dummy tlists and set_customscan_references

2021-04-12 Thread Luc Vlaming

Hi,

Whilst developing a CSP that potentially sits (directly) above e.g. any 
union or anything with a dummy tlist we observed some problems as the 
set_customscan_references cannot handle any dummy tlists and will give 
invalid varno errors. I was wondering how we can fix this, and I was 
wondering what the reason is that there is actually no callback in the 
csp interface for the set_customscan_references. Can someone maybe 
clarify this for me?


Thanks!

Regards,
Luc




"could not find pathkey item to sort" for TPC-DS queries 94-96

2021-04-12 Thread Luc Vlaming

Hi,

When trying to run on master (but afaik also PG-13) TPC-DS queries 94, 
95 and 96 on a SF10 I get the error "could not find pathkey item to sort".
When I disable enable_gathermerge the problem goes away and then the 
plan for query 94 looks like below. I tried figuring out what the 
problem is but to be honest I would need some pointers as the code that 
tries to matching equivalence members in prepare_sort_from_pathkeys is 
something i'm really not familiar with.


To reproduce you can either ingest and test using the toolkit I used too 
(see https://github.com/swarm64/s64da-benchmark-toolkit/), or 
alternatively just use the schema (see 
https://github.com/swarm64/s64da-benchmark-toolkit/tree/master/benchmarks/tpcds/schemas/psql_native)


Best,
Luc


 Limit  (cost=229655.62..229655.63 rows=1 width=72)
   ->  Sort  (cost=229655.62..229655.63 rows=1 width=72)
 Sort Key: (count(DISTINCT ws1.ws_order_number))
 ->  Aggregate  (cost=229655.60..229655.61 rows=1 width=72)
   ->  Nested Loop Semi Join  (cost=1012.65..229655.59 
rows=1 width=16)
 ->  Nested Loop  (cost=1012.22..229653.73 rows=1 
width=20)
   Join Filter: (ws1.ws_web_site_sk = 
web_site.web_site_sk)
   ->  Nested Loop  (cost=1012.22..229651.08 
rows=1 width=24)
 ->  Gather  (cost=1011.80..229650.64 
rows=1 width=28)

   Workers Planned: 2
   ->  Nested Loop Anti Join 
(cost=11.80..228650.54 rows=1 width=28)
 ->  Hash Join 
(cost=11.37..227438.35 rows=2629 width=28)
   Hash Cond: 
(ws1.ws_ship_date_sk = date_dim.d_date_sk)
   ->  Parallel Seq 
Scan on web_sales ws1  (cost=0.00..219548.92 rows=3000992 width=32)
   ->  Hash 
(cost=10.57..10.57 rows=64 width=4)
 ->  Index Scan 
using idx_d_date on date_dim  (cost=0.29..10.57 rows=64 width=4)
   Index 
Cond: ((d_date >= '2000-03-01'::date) AND (d_date <= '2000-04-30'::date))
 ->  Index Only Scan using 
idx_wr_order_number on web_returns wr1  (cost=0.42..0.46 rows=2 width=4)
   Index Cond: 
(wr_order_number = ws1.ws_order_number)
 ->  Index Scan using 
customer_address_pkey on customer_address  (cost=0.42..0.44 rows=1 width=4)
   Index Cond: (ca_address_sk = 
ws1.ws_ship_addr_sk)
   Filter: ((ca_state)::text = 
'GA'::text)
   ->  Seq Scan on web_site  (cost=0.00..2.52 
rows=10 width=4)
 Filter: ((web_company_name)::text = 
'pri'::text)
 ->  Index Scan using idx_ws_order_number on 
web_sales ws2  (cost=0.43..1.84 rows=59 width=8)
   Index Cond: (ws_order_number = 
ws1.ws_order_number)

   Filter: (ws1.ws_warehouse_sk <> ws_warehouse_sk)

The top of the stacktrace is:
#0  errfinish (filename=0x5562dc1a5125 "createplan.c", lineno=6186, 
funcname=0x5562dc1a54d0 <__func__.14> "prepare_sort_from_pathkeys") at 
elog.c:514
#1  0x5562dbc2d7de in prepare_sort_from_pathkeys 
(lefttree=0x5562dc5a2f58, pathkeys=0x5562dc4eabc8, relids=0x0, 
reqColIdx=0x0, adjust_tlist_in_place=, 
p_numsortkeys=0x7ffc0b8cda84, p_sortColIdx=0x7ffc0b8cda88, 
p_sortOperators=0x7ffc0b8cda90, p_collations=0x7ffc0b8cda98, 
p_nullsFirst=0x7ffc0b8cdaa0) at createplan.c:6186
#2  0x5562dbe8d695 in make_sort_from_pathkeys (lefttree=out>, pathkeys=, relids=) at createplan.c:6313
#3  0x5562dbe8eba3 in create_sort_plan (flags=1, 
best_path=0x5562dc548d68, root=0x5562dc508cf8) at createplan.c:2118
#4  create_plan_recurse (root=0x5562dc508cf8, best_path=0x5562dc548d68, 
flags=1) at createplan.c:489
#5  0x5562dbe8f315 in create_gather_merge_plan 
(best_path=0x5562dc5782f8, root=0x5562dc508cf8) at createplan.c:1885
#6  create_plan_recurse (root=0x5562dc508cf8, best_path=0x5562dc5782f8, 
flags=) at createplan.c:541
#7  0x5562dbe8ddad in create_nestloop_plan 
(best_path=0x5562dc585668, root=0x5562dc508cf8) at createplan.c:4237
#8  create_join_plan (best_path=0x5562dc585668, root=0x5562dc508cf8) at 
createplan.c:1062
#9  create_plan_recurse (root=0x5562dc508cf8, best_path=0x5562dc585668, 
flags=) at createplan.c:418
#10 0x5562dbe8ddad in create_nestloop_plan 
(best_path=0x5562dc5c4428, root=0x5562dc508cf8) at createplan.c:4237
#11 create_join_plan (best_path=0x5562dc5c4428, 

Re: Lazy JIT IR code generation to increase JIT speed with partitions

2021-04-12 Thread Luc Vlaming

On 18-01-2021 08:47, Luc Vlaming wrote:

Hi everyone, Andres,

On 03-01-2021 11:05, Luc Vlaming wrote:

On 30-12-2020 14:23, Luc Vlaming wrote:

On 30-12-2020 02:57, Andres Freund wrote:

Hi,

Great to see work in this area!


I would like this topic to somehow progress and was wondering what other 
benchmarks / tests would be needed to have some progress? I've so far 
provided benchmarks for small(ish) queries and some tpch numbers, 
assuming those would be enough.




On 2020-12-28 09:44:26 +0100, Luc Vlaming wrote:
I would like to propose a small patch to the JIT machinery which 
makes the
IR code generation lazy. The reason for postponing the generation 
of the IR

code is that with partitions we get an explosion in the number of JIT
functions generated as many child tables are involved, each with 
their own
JITted functions, especially when e.g. partition-aware 
joins/aggregates are
enabled. However, only a fraction of those functions is actually 
executed
because the Parallel Append node distributes the workers among the 
nodes.
With the attached patch we get a lazy generation which makes that 
this is no

longer a problem.


I unfortunately don't think this is quite good enough, because it'll
lead to emitting all functions separately, which can also lead to very
substantial increases of the required time (as emitting code is an
expensive step). Obviously that is only relevant in the cases where the
generated functions actually end up being used - which isn't the 
case in

your example.

If you e.g. look at a query like
   SELECT blub, count(*),sum(zap) FROM foo WHERE blarg = 3 GROUP BY 
blub;

on a table without indexes, you would end up with functions for

- WHERE clause (including deforming)
- projection (including deforming)
- grouping key
- aggregate transition
- aggregate result projection

with your patch each of these would be emitted separately, instead of
one go. Which IIRC increases the required time by a significant amount,
especially if inlining is done (where each separate code generation 
ends

up with copies of the inlined code).


As far as I can see you've basically falsified the second part of this
comment (which you moved):


+
+    /*
+ * Don't immediately emit nor actually generate the function.
+ * instead do so the first time the expression is actually 
evaluated.
+ * That allows to emit a lot of functions together, avoiding a 
lot of
+ * repeated llvm and memory remapping overhead. It also helps 
with not
+ * compiling functions that will never be evaluated, as can be 
the case
+ * if e.g. a parallel append node is distributing workers 
between its

+ * child nodes.
+ */



-    /*
- * Don't immediately emit function, instead do so the first 
time the

- * expression is actually evaluated. That allows to emit a lot of
- * functions together, avoiding a lot of repeated llvm and memory
- * remapping overhead.
- */


Greetings,

Andres Freund



Hi,

Happy to help out, and thanks for the info and suggestions.
Also, I should have first searched psql-hackers and the like, as I 
just found out there is already discussions about this in [1] and [2].
However I think the approach I took can be taken independently and 
then other solutions could be added on top.


Assuming I understood all suggestions correctly, the ideas so far are:
1. add a LLVMAddMergeFunctionsPass so that duplicate code is removed 
and not optimized several times (see [1]). Requires all code to be 
emitted in the same module.

2. JIT only parts of the plan, based on cost (see [2]).
3. Cache compilation results to avoid recompilation. this would 
either need a shm capable optimized IR cache or would not work with 
parallel workers.

4. Lazily jitting (this patch)

An idea that might not have been presented in the mailing list yet(?):
5. Only JIT in nodes that process a certain amount of rows. Assuming 
there is a constant overhead for JITting and the goal is to gain 
runtime.


Going forward I would first try to see if my current approach can 
work out. The only idea that would be counterproductive to my 
solution would be solution 1. Afterwards I'd like to continue with 
either solution 2, 5, or 3 in the hopes that we can reduce JIT 
overhead to a minimum and can therefore apply it more broadly.


To test out why and where the JIT performance decreased with my 
solution I improved the test script and added various queries to 
model some of the cases I think we should care about. I have not 
(yet) done big scale benchmarks as these queries seemed to already 
show enough problems for now. Now there are 4 queries which test 
JITting with/without partitions, and with varying amounts of workers 
and rowcounts. I hope these are indeed a somewhat representative set 
of queries.


As pointed out the current patch does create a degradation in 
performance wrt queries that are not partitioned (basically q3 and 
q4). After looking into those queries I noticed two things

Re: allow partial union-all and improve parallel subquery costing

2021-04-12 Thread Luc Vlaming

Hi David,

On 15-03-2021 14:09, David Steele wrote:

Hi Luc,

On 12/30/20 8:54 AM, Luc Vlaming wrote:


Created a commitfest entry assuming this is the right thing to do so 
that someone can potentially pick it up during the commitfest.


Providing an updated patch based on latest master.


Looks like you need another rebase: 
http://cfbot.cputube.org/patch_32_2787.log. Marked as Waiting for Author.


You may also want to give a more detailed description of what you have 
done here and why it improves execution plans. This may help draw some 
reviewers.


Regards,


Here's an improved and rebased patch. Hope the description helps some 
people. I will resubmit it to the next commitfest.


Regards,
Luc
>From e918e7cf8c9fe628c7daba2ccf37ad767691e4c7 Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Mon, 12 Apr 2021 09:55:30 +0200
Subject: [PATCH v4] Add explicit partial UNION ALL path and improve parallel
 subquery rowcounts and costing.

By adding the partial union-all path we get parallel plans whenever
the flatten_simple_union_all cannot be applied, e.g. whenever
the column types do not exactly match. A simple testcase shows
this in the regression tests.
Also for e.g. tpc-ds query 5 we now get a more parallel plan
for the part that processes the csr CTE.
To make it more likely that the improved path is chosen another
small fix is added which corrects the rowcounts when subquery
nodes are used in parallel plans.
---
 src/backend/optimizer/path/costsize.c | 11 
 src/backend/optimizer/prep/prepunion.c|  4 ++
 .../regress/expected/incremental_sort.out | 10 ++--
 src/test/regress/expected/union.out   | 52 +++
 src/test/regress/sql/union.sql| 37 +
 5 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 8577c7b138..1da6879c6d 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1426,6 +1426,17 @@ cost_subqueryscan(SubqueryScanPath *path, PlannerInfo *root,
 	startup_cost += path->path.pathtarget->cost.startup;
 	run_cost += path->path.pathtarget->cost.per_tuple * path->path.rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->path.parallel_workers > 0)
+	{
+		double  parallel_divisor = get_parallel_divisor(>path);
+
+		path->path.rows = clamp_row_est(path->path.rows / parallel_divisor);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+	}
+
 	path->path.startup_cost += startup_cost;
 	path->path.total_cost += startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 037dfaacfd..7d4a6a19c2 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -679,6 +679,10 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 			   NIL, NULL,
 			   parallel_workers, enable_parallel_append,
 			   -1);
+
+		if (op->all && enable_parallel_append)
+			add_partial_path(result_rel, ppath);
+
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 			   result_rel->reltarget, NULL, NULL);
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index a417b566d9..a0a31ba053 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1487,14 +1487,12 @@ explain (costs off) select * from t union select * from t order by 1,3;
->  Unique
  ->  Sort
Sort Key: t.a, t.b, t.c
-   ->  Append
- ->  Gather
-   Workers Planned: 2
+   ->  Gather
+ Workers Planned: 2
+ ->  Parallel Append
->  Parallel Seq Scan on t
- ->  Gather
-   Workers Planned: 2
->  Parallel Seq Scan on t t_1
-(13 rows)
+(11 rows)
 
 -- Full sort, not just incremental sort can be pushed below a gather merge path
 -- by generate_useful_gather_paths.
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 75f78db8f5..cf7660f524 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1420,3 +1420,55 @@ where (x = 0) or (q1 >= q2 and q1 <= q2);
  4567890123456789 |  4567890123456789 | 1
 (6 rows)
 
+-- Test handling of appendrel with different types which disables the path flattening and
+-- forces a subquery node. for the subquery node ensure the rowcounts are correct.
+create function check_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+ln text;
+tmp text[];
+first_row bool := true;
+begin
+for 

join plan with unexpected var clauses

2021-02-02 Thread Luc Vlaming

Hi,

At a customer we came across a curious plan (see attached testcase).

Given the testcase we see that the outer semi join tries to join the 
outer with the inner table id columns, even though the middle table id 
column is also there. Is this expected behavior?


The reason i'm asking is two-fold:
- the inner hash table now is bigger than i'd expect and has columns 
that you would normally not select on.
- the middle join now projects the inner as result, which is quite 
suprising and seems invalid from a SQL standpoint.


Plan:
 Finalize Aggregate
   Output: count(*)
   ->  Gather
 Output: (PARTIAL count(*))
 Workers Planned: 4
 ->  Partial Aggregate
   Output: PARTIAL count(*)
   ->  Parallel Hash Semi Join
 Hash Cond: (_outer.id3 = _inner.id2)
 ->  Parallel Seq Scan on public._outer
   Output: _outer.id3, _outer.extra1
 ->  Parallel Hash
   Output: middle.id1, _inner.id2
   ->  Parallel Hash Semi Join
 Output: middle.id1, _inner.id2
 Hash Cond: (middle.id1 = _inner.id2)
 ->  Parallel Seq Scan on public.middle
   Output: middle.id1
 ->  Parallel Hash
   Output: _inner.id2
   ->  Parallel Seq Scan on 
public._inner

 Output: _inner.id2

Kind regards,
Luc
Swarm64


testcase.sql
Description: application/sql


Re: New Table Access Methods for Multi and Single Inserts

2021-01-17 Thread Luc Vlaming

On 17-01-2021 00:04, Jeff Davis wrote:



If we agree on removing heap_multi_insert_v2 API and embed that logic
inside heap_insert_v2, then we can do this - pass the required
information and the functions ExecInsertIndexTuples and
ExecARInsertTriggers as callbacks so that, whether or not
heap_insert_v2 choses single or multi inserts, it can callback these
functions with the required information passed after the flush. We
can
add the callback and required information into TableInsertState. But,
I'm not quite sure, we would make ExecInsertIndexTuples and
ExecARInsertTriggers.


How should the API interact with INSERT INTO ... SELECT? Right now it
doesn't appear to be integrated at all, but that seems like a fairly
important path for bulk inserts.

Regards,
Jeff Davis




Hi,

You mean how it could because of that the table modification API uses 
the table_tuple_insert_speculative ? Just wondering if you think if it 
generally cannot work or would like to see that path / more paths 
integrated in to the patch.


Kind regards,
Luc




Re: Lazy JIT IR code generation to increase JIT speed with partitions

2021-01-17 Thread Luc Vlaming

Hi everyone, Andres,

On 03-01-2021 11:05, Luc Vlaming wrote:

On 30-12-2020 14:23, Luc Vlaming wrote:

On 30-12-2020 02:57, Andres Freund wrote:

Hi,

Great to see work in this area!


I would like this topic to somehow progress and was wondering what other 
benchmarks / tests would be needed to have some progress? I've so far 
provided benchmarks for small(ish) queries and some tpch numbers, 
assuming those would be enough.




On 2020-12-28 09:44:26 +0100, Luc Vlaming wrote:
I would like to propose a small patch to the JIT machinery which 
makes the
IR code generation lazy. The reason for postponing the generation of 
the IR

code is that with partitions we get an explosion in the number of JIT
functions generated as many child tables are involved, each with 
their own
JITted functions, especially when e.g. partition-aware 
joins/aggregates are
enabled. However, only a fraction of those functions is actually 
executed
because the Parallel Append node distributes the workers among the 
nodes.
With the attached patch we get a lazy generation which makes that 
this is no

longer a problem.


I unfortunately don't think this is quite good enough, because it'll
lead to emitting all functions separately, which can also lead to very
substantial increases of the required time (as emitting code is an
expensive step). Obviously that is only relevant in the cases where the
generated functions actually end up being used - which isn't the case in
your example.

If you e.g. look at a query like
   SELECT blub, count(*),sum(zap) FROM foo WHERE blarg = 3 GROUP BY 
blub;

on a table without indexes, you would end up with functions for

- WHERE clause (including deforming)
- projection (including deforming)
- grouping key
- aggregate transition
- aggregate result projection

with your patch each of these would be emitted separately, instead of
one go. Which IIRC increases the required time by a significant amount,
especially if inlining is done (where each separate code generation ends
up with copies of the inlined code).


As far as I can see you've basically falsified the second part of this
comment (which you moved):


+
+    /*
+ * Don't immediately emit nor actually generate the function.
+ * instead do so the first time the expression is actually 
evaluated.
+ * That allows to emit a lot of functions together, avoiding a 
lot of
+ * repeated llvm and memory remapping overhead. It also helps 
with not
+ * compiling functions that will never be evaluated, as can be 
the case
+ * if e.g. a parallel append node is distributing workers 
between its

+ * child nodes.
+ */



-    /*
- * Don't immediately emit function, instead do so the first 
time the

- * expression is actually evaluated. That allows to emit a lot of
- * functions together, avoiding a lot of repeated llvm and memory
- * remapping overhead.
- */


Greetings,

Andres Freund



Hi,

Happy to help out, and thanks for the info and suggestions.
Also, I should have first searched psql-hackers and the like, as I 
just found out there is already discussions about this in [1] and [2].
However I think the approach I took can be taken independently and 
then other solutions could be added on top.


Assuming I understood all suggestions correctly, the ideas so far are:
1. add a LLVMAddMergeFunctionsPass so that duplicate code is removed 
and not optimized several times (see [1]). Requires all code to be 
emitted in the same module.

2. JIT only parts of the plan, based on cost (see [2]).
3. Cache compilation results to avoid recompilation. this would either 
need a shm capable optimized IR cache or would not work with parallel 
workers.

4. Lazily jitting (this patch)

An idea that might not have been presented in the mailing list yet(?):
5. Only JIT in nodes that process a certain amount of rows. Assuming 
there is a constant overhead for JITting and the goal is to gain runtime.


Going forward I would first try to see if my current approach can work 
out. The only idea that would be counterproductive to my solution 
would be solution 1. Afterwards I'd like to continue with either 
solution 2, 5, or 3 in the hopes that we can reduce JIT overhead to a 
minimum and can therefore apply it more broadly.


To test out why and where the JIT performance decreased with my 
solution I improved the test script and added various queries to model 
some of the cases I think we should care about. I have not (yet) done 
big scale benchmarks as these queries seemed to already show enough 
problems for now. Now there are 4 queries which test JITting 
with/without partitions, and with varying amounts of workers and 
rowcounts. I hope these are indeed a somewhat representative set of 
queries.


As pointed out the current patch does create a degradation in 
performance wrt queries that are not partitioned (basically q3 and 
q4). After looking into those queries I noticed two things:
- q3 is very noisy wrt JIT timings. This seems

Re: Parallel Inserts in CREATE TABLE AS

2021-01-12 Thread Luc Vlaming

On 06-01-2021 09:32, Bharath Rupireddy wrote:

On Tue, Jan 5, 2021 at 1:25 PM Luc Vlaming  wrote:

wrt v18-0002patch:

It looks like this introduces a state machine that goes like:
- starts at CTAS_PARALLEL_INS_UNDEF
- possibly moves to CTAS_PARALLEL_INS_SELECT
- CTAS_PARALLEL_INS_TUP_COST_CAN_IGN can be added
- if both were added at some stage, we can go to
CTAS_PARALLEL_INS_TUP_COST_IGNORED and ignore the costs

what i'm wondering is why you opted to put logic around
generate_useful_gather_paths and in cost_gather when to me it seems more
logical to put it in create_gather_path? i'm probably missing something
there?


IMO, The reason is we want to make sure we only ignore the cost when Gather is 
the top node.
And it seems the generate_useful_gather_paths called in 
apply_scanjoin_target_to_paths is the right place which can only create top 
node Gather.
So we change the flag in apply_scanjoin_target_to_paths around 
generate_useful_gather_paths to identify the top node.


Right. We wanted to ignore parallel tuple cost for only the upper Gather path.


I was wondering actually if we need the state machine. Reason is that as
AFAICS the code could be placed in create_gather_path, where you can
also check if it is a top gather node, whether the dest receiver is the
right type, etc? To me that seems like a nicer solution as its makes
that all logic that decides whether or not a parallel CTAS is valid is
in a single place instead of distributed over various places.


IMO, we can't determine the fact that we are going to generate the top
Gather path in create_gather_path. To decide on whether or not the top
Gather path generation, I think it's not only required to check the
root->query_level == 1 but we also need to rely on from where
generate_useful_gather_paths gets called. For instance, for
query_level 1, generate_useful_gather_paths gets called from 2 places
in apply_scanjoin_target_to_paths. Likewise, create_gather_path also
gets called from many places. IMO, the current way i.e. setting flag
it in apply_scanjoin_target_to_paths and ignoring based on that in
cost_gather seems safe.

I may be wrong. Thoughts?


So the way I understand it the requirements are:
- it needs to be the top-most gather
- it should not do anything with the rows after the gather node as this
would make the parallel inserts conceptually invalid.


Right.


Right now we're trying to judge what might be added on-top that could
change the rows by inspecting all parts of the root object that would
cause anything to be added, and add a little statemachine to track the
state of that knowledge. To me this has the downside that the list in
HAS_PARENT_PATH_GENERATING_CLAUSE has to be exhaustive, and we need to
make sure it stays up-to-date, which could result in regressions if not
tracked carefully.


Right. Any new clause that will be added which generates an upper path
in grouping_planner after apply_scanjoin_target_to_paths also needs to
be added to HAS_PARENT_PATH_GENERATING_CLAUSE. Otherwise, we might
ignore the parallel tuple cost because of which the parallel plan may
be chosen and we go for parallel inserts only when the top node is
Gather. I don't think any new clause that will be added generates a
new upper Gather node in grouping_planner after
apply_scanjoin_target_to_paths.


Personally I would therefore go for a design which is safe in the sense
that regressions are not as easily introduced. IMHO that could be done
by inspecting the planned query afterwards, and then judging whether or
not the parallel inserts are actually the right thing to do.


The 0001 patch does that. It doesn't have any influence on the planner
for parallel tuple cost calculation, it just looks at the generated
plan and decides on parallel inserts. Having said that, we might miss
parallel plans even though we know that there will not be tuples
transferred from workers to Gather. So, 0002 patch adds the code for
influencing the planner for parallel tuple cost.



Ok. Thanks for the explanation and sorry for the confusion.


Another way to create more safety against regressions would be to add an
assert upon execution of the query that if we do parallel inserts that
only a subset of allowed nodes exists above the gather node.


Yes, we already do this. Please have a look at
SetParallelInsertState() in the 0002 patch. The idea is that in any
case, if the planner ignored the tuple cost, but we later not allow
parallel inserts either due to the upper node is not Gather or Gather
with projections. The assertion fails. So, in case any new parent path
generating clause is added (apart from the ones that are there in
HAS_PARENT_PATH_GENERATING_CLAUSE) and we ignore the tuple cost, then
this Assert will catch it. Currently, I couldn't find any assertion
failures in my debug build with make check and make check world.



Ok. Seems I missed that assert when reviewing.


+else
+{
+/*
+ * Upper Gather node has projections, so paral

Re: New Table Access Methods for Multi and Single Inserts

2021-01-12 Thread Luc Vlaming

On 06-01-2021 14:06, Bharath Rupireddy wrote:

On Wed, Jan 6, 2021 at 12:56 PM Luc Vlaming  wrote:

The main reason for me for wanting a single API is that I would like the
decision of using single or multi inserts to move to inside the tableam.
For e.g. a heap insert we might want to put the threshold at e.g. 100
rows so that the overhead of buffering the tuples is actually
compensated. For other tableam this logic might also be quite different,
and I think therefore that it shouldn't be e.g. COPY or CTAS deciding
whether or not multi inserts should be used. Because otherwise the thing
we'll get is that there will be tableams that will ignore this flag and
do their own thing anyway. I'd rather have an API that gives all
necessary information to the tableam and then make the tableam do "the
right thing".

Another reason I'm suggesting this API is that I would expect that the
begin is called in a different place in the code for the (multiple)
inserts than the actual insert statement.
To me conceptually the begin and end are like e.g. the executor begin
and end: you prepare the inserts with the knowledge you have at that
point. I assumed (wrongly?) that during the start of the statement one
knows best how many rows are coming; and then the actual insertion of
the row doesn't have to deal anymore with multi/single inserts, choosing
when to buffer or not, because that information has already been given
during the initial phase. One of the reasons this is appealing to me is
that e.g. in [1] there was discussion on when to switch to a multi
insert state, and imo this should be up to the tableam.


Agree that whether to go with the multi or single inserts should be
completely left to tableam implementation, we, as callers of those API
just need to inform whether we expect single or multiple rows, and it
should be left to tableam implementation whether to actually go with
buffering or single inserts. ISTM that it's an elegant way of making
the API generic and abstracting everything from the callers. What I
wonder is how can we know in advance the expected row count that we
need to pass in to heap_insert_begin()? IIUC, we can not estimate the
upcoming rows in COPY, Insert Into Select, or Refresh Mat View or some
other insert queries?  Of course, we can look at the planner's
estimated row count for the selects in COPY, Insert Into Select or
Refresh Mat View after the planning, but to me that's not something we
can depend on and pass in the row count to the insert APIs.

When we don't know the expected row count, why can't we(as callers of
the APIs) tell the APIs something like, "I'm intending to perform
multi inserts, so if possible and if you have a mechanism to buffer
the slots, do it, otherwise insert the tuples one by one, or else do
whatever you want to do with the tuples I give it you". So, in case of
COPY we can ask the API for multi inserts and call heap_insert_begin()
and heap_insert_v2().



I thought that when it is available (because of planning) it would be 
nice to pass it in. If you don't know you could pass in a 1 for doing 
single inserts, and e.g. -1 or max-int for streaming. The reason I 
proposed it is so that tableam's have as much knowledge as posisble to 
do the right thing. is_multi does also work of course but is just 
somewhat less informative.


What to me seemed somewhat counterintuitive is that with the proposed 
API it is possible to say is_multi=true and then still call 
heap_insert_v2 to do a single insert.



Given the above explanation, I still feel bool is_multi would suffice.

Thoughts?

On dynamically, switching from single to multi inserts, this can be
done by heap_insert_v2 itself. The way I think it's possible is that,
say we have some threshold row count 1000(can be a macro)  after
inserting those many tuples, heap_insert_v2 can switch to buffering
mode.


For that I thought it'd be good to use the expected row count, but yeah 
dynamically switching also works and might work better if the expected 
row counts are usually off.




Thoughts?


Which would make the code something like:

void
heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
{
 TupleTableSlot  *batchslot;
 HeapMultiInsertState *mistate = (HeapMultiInsertState *)state->mistate;
 Size sz;

 Assert(mistate && mistate->slots);

 if (mistate->slots[mistate->cur_slots] == NULL)
 mistate->slots[mistate->cur_slots] =
 
table_slot_create(state->rel, NULL);

 batchslot = mistate->slots[mistate->cur_slots];

 ExecClearTuple(batchslot);
 ExecCopySlot(batchslot, slot);

 /*
  * Calculate the tuple size after the original slot is copied, because 
the
  * copied slot type and the tuple size may change.
  */
 sz = GetTupleSize(batchslot, mistate->max_siz

Re: New Table Access Methods for Multi and Single Inserts

2021-01-05 Thread Luc Vlaming

On 05-01-2021 11:06, Bharath Rupireddy wrote:
On Mon, Jan 4, 2021 at 1:29 PM Luc Vlaming <mailto:l...@swarm64.com>> wrote:

 >  > table AM patch [2] be reviewed further?
 > As to the patches themselves:
 >
 > I think the API is a huge step forward! I assume that we want to have a
 > single-insert API like heap_insert_v2 so that we can encode the
 > knowledge that there will just be a single insert coming and likely a
 > commit afterwards?
 >
 > Reason I'm asking is that I quite liked the heap_insert_begin parameter
 > is_multi, which could even be turned into a "expected_rowcount" of the
 > amount of rows expected to be commited in the transaction (e.g. single,
 > several, thousands/stream).
 > If we were to make the API based on expected rowcounts, the whole
 > heap_insert_v2, heap_insert and heap_multi_insert could be turned into a
 > single function heap_insert, as the knowledge about buffering of the
 > slots is then already stored in the TableInsertState, creating an API 
like:

 >
 > // expectedRows: -1 = streaming, otherwise expected rowcount.
 > TableInsertState* heap_insert_begin(Relation rel, CommandId cid, int
 > options, int expectedRows);
 > heap_insert(TableInsertState *state, TupleTableSlot *slot);
 >
 > Do you think that's a good idea?

IIUC, your suggestion is to use expectedRows and move the multi insert 
implementation heap_multi_insert_v2 to heap_insert_v2. If that's 
correct, so heap_insert_v2 will look something like this:


heap_insert_v2()
{
     if (single_insert)
       //do single insertion work, the code in existing heap_insert_v2 
comes here

    else
       //do multi insertion work, the code in existing 
heap_multi_insert_v2 comes here

}

I don't see any problem in combining single and multi insert APIs into 
one. Having said that, will the APIs be cleaner then? Isn't it going to 
be confusing if a single heap_insert_v2 API does both the works? With 
the existing separate APIs, for single insertion, the sequence of the 
API can be like begin, insert_v2, end and for multi inserts it's like 
begin, multi_insert_v2, flush, end. I prefer to have a separate multi 
insert API so that it will make the code look readable.


Thoughts?


The main reason for me for wanting a single API is that I would like the 
decision of using single or multi inserts to move to inside the tableam.
For e.g. a heap insert we might want to put the threshold at e.g. 100 
rows so that the overhead of buffering the tuples is actually 
compensated. For other tableam this logic might also be quite different, 
and I think therefore that it shouldn't be e.g. COPY or CTAS deciding 
whether or not multi inserts should be used. Because otherwise the thing 
we'll get is that there will be tableams that will ignore this flag and 
do their own thing anyway. I'd rather have an API that gives all 
necessary information to the tableam and then make the tableam do "the 
right thing".


Another reason I'm suggesting this API is that I would expect that the 
begin is called in a different place in the code for the (multiple) 
inserts than the actual insert statement.
To me conceptually the begin and end are like e.g. the executor begin 
and end: you prepare the inserts with the knowledge you have at that 
point. I assumed (wrongly?) that during the start of the statement one 
knows best how many rows are coming; and then the actual insertion of 
the row doesn't have to deal anymore with multi/single inserts, choosing 
when to buffer or not, because that information has already been given 
during the initial phase. One of the reasons this is appealing to me is 
that e.g. in [1] there was discussion on when to switch to a multi 
insert state, and imo this should be up to the tableam.




 > Two smaller things I'm wondering:
 > - the clear_mi_slots; why is this not in the HeapMultiInsertState? the
 > slots themselves are declared there?

Firstly, we need to have the buffered slots sometimes(please have a look 
at the comments in TableInsertState structure) outside the multi_insert 
API. And we need to have cleared the previously flushed slots before we 
start buffering in heap_multi_insert_v2(). I can remove the 
clear_mi_slots flag altogether and do as follows: I will not set 
mistate->cur_slots to 0 in heap_multi_insert_flush after the flush, I 
will only set state->flushed to true. In heap_multi_insert_v2,


void
heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
{
     TupleTableSlot  *batchslot;
     HeapMultiInsertState *mistate = (HeapMultiInsertState *)state->mistate;
     Size sz;

     Assert(mistate && mistate->slots);

*  /* if the slots are flushed previously then clear them off before 
using them again. */

     if (state->flushed)
     {
         int i;

         for (i = 0; i < mistate->cur_slots; i++)
             ExecClearTuple(mistate

Re: New Table Access Methods for Multi and Single Inserts

2021-01-05 Thread Luc Vlaming

On 05-01-2021 22:28, Jeff Davis wrote:

On Mon, 2021-01-04 at 08:59 +0100, Luc Vlaming wrote:

Reason I'm asking is that I quite liked the heap_insert_begin
parameter
is_multi, which could even be turned into a "expected_rowcount" of
the
amount of rows expected to be commited in the transaction (e.g.
single,
several, thousands/stream).


Do you mean "written by the statement" instead of "committed in the
transaction"? It doesn't look like the TableInsertState state will
survive across statement boundaries.

Though that is an important question to consider. If the premise is
that a given custom AM may be much more efficient at bulk inserts than
retail inserts (which is reasonable), then it makes sense to handle the
case of a transaction with many single-tuple inserts. But keeping
insert state across statement boundaries also raises a few potential
problems.

Regards,
Jeff Davis




I did actually mean until the end of the transaction. I know this is 
currently not possible with the current design but I think it would be 
cool to start going that way (even if slightly). Creating some more 
freedom on how a tableam optimizes inserts, when one syncs to disk, etc 
would be good imo. It would allow one to create e.g. a tableam that 
would not have as a high overhead when doing single statement inserts.


Kind regards,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-05 Thread Luc Vlaming

On 05-01-2021 13:57, Bharath Rupireddy wrote:

On Tue, Jan 5, 2021 at 1:00 PM Luc Vlaming  wrote:

Reviewing further v20-0001:

I would still opt for moving the code for the parallel worker into a
separate function, and then setting rStartup of the dest receiver to
that function in ExecParallelGetInsReceiver, as its completely
independent code. Just a matter of style I guess.


If we were to have a intorel_startup_worker and assign it to
self->pub.rStartup, 1) we can do it in the CreateIntoRelDestReceiver,
we have to pass a parameter to CreateIntoRelDestReceiver as an
indication of parallel worker, which requires code changes in places
wherever CreateIntoRelDestReceiver is used. 2) we can also assign
intorel_startup_worker after CreateIntoRelDestReceiver in
ExecParallelGetInsReceiver, but that doesn't look good to me. 3) we
can duplicate CreateIntoRelDestReceiver and have a
CreateIntoRelParallelDestReceiver with the only change being that
self->pub.rStartup = intorel_startup_worker;

IMHO, the way it is currently, looks good. Anyways, I'm open to
changing that if we agree on any of the above 3 ways.


The current way is good enough, it was a suggestion as personally I find 
it hard to read to have two completely separate code paths in the same 
function. If any I would opt for something like 3) where there's a 
CreateIntoRelParallelDestReceiver which calls CreateIntoRelDestReceiver 
and then overrides rStartup to intorel_startup_worker. Then no callsites 
have to change except the ones that are for parallel workers.


If we were to do any of the above, then we might have to do the same
thing for other commands Refresh Materialized View or Copy To where we
can parallelize.

Thoughts?


Maybe I'm not completely following why but afaics we want parallel
inserts in various scenarios, not just CTAS? I'm asking because code like
+   if (fpes->ins_cmd_type == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)
+   pg_atomic_add_fetch_u64(>processed,
queryDesc->estate->es_processed);
seems very specific to CTAS. For now that seems fine but I suppose that
would be generalized soon after? Basically I would have expected the if
to compare against PARALLEL_INSERT_CMD_UNDEF.


After this patch is reviewed and goes for commit, then the next thing
I plan to do is to allow parallel inserts in Refresh Materialized View
and it can be used for that. I think the processed variable can also
be used for parallel inserts in INSERT INTO SELECT [1] as well.
Currently, I'm keeping it for CTAS, maybe later (after this is
committed) it can be generalized.

Thoughts?


Sounds good



[1] - 
https://www.postgresql.org/message-id/CAA4eK1LMmz58ej5BgVLJ8VsUGd%3D%2BKcaA8X%3DkStORhxpfpODOxg%40mail.gmail.com


Apart from these small things v20-0001 looks (very) good to me.
v20-0003 and v20-0004:
looks good to me.


Thanks.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Kind regards,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-05 Thread Luc Vlaming

On 05-01-2021 11:32, Dilip Kumar wrote:

On Tue, Jan 5, 2021 at 12:43 PM Luc Vlaming  wrote:


On 04-01-2021 14:32, Bharath Rupireddy wrote:

On Mon, Jan 4, 2021 at 4:22 PM Luc Vlaming mailto:l...@swarm64.com>> wrote:
  > Sorry it took so long to get back to reviewing this.

Thanks for the comments.

  > wrt v18-0001patch:
  >
  > +   /*
  > +* If the worker is for parallel insert in CTAS, then
use the proper
  > +* dest receiver.
  > +*/
  > +   intoclause = (IntoClause *) stringToNode(intoclausestr);
  > +   receiver = CreateIntoRelDestReceiver(intoclause);
  > +   ((DR_intorel *)receiver)->is_parallel_worker = true;
  > +   ((DR_intorel *)receiver)->object_id = fpes->objectid;
  > I would move this into a function called e.g.
  > GetCTASParallelWorkerReceiver so that the details wrt CTAS can be put in
  > createas.c.
  > I would then also split up intorel_startup into intorel_leader_startup
  > and intorel_worker_startup, and in GetCTASParallelWorkerReceiver set
  > self->pub.rStartup to intorel_worker_startup.

My intention was to not add any new APIs to the dest receiver. I simply
made the changes in intorel_startup, in which for workers it just does
the minimalistic work and exit from it. In the leader most of the table
creation and sanity check is kept untouched. Please have a look at the
v19 patch posted upthread [1].



Looks much better, really nicely abstracted away in the v20 patch.


  > +   volatile pg_atomic_uint64   *processed;
  > why is it volatile?

Intention is to always read from the actual memory location. I referred
it from the way pg_atomic_fetch_add_u64_impl,
pg_atomic_compare_exchange_u64_impl, pg_atomic_init_u64_impl and their
u32 counterparts use pass the parameter as volatile pg_atomic_uint64 *ptr.


But in your case, I do not understand the intention that where do you
think that the compiler can optimize it and read the old value?



It can not and should not. I had just only seen so far c++ atomic 
variables and not a (postgres-specific?) c atomic variable which 
apparently requires the volatile keyword. My stupidity ;)


Cheers,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-04 Thread Luc Vlaming

On 04-01-2021 14:53, Bharath Rupireddy wrote:

On Mon, Jan 4, 2021 at 5:44 PM Luc Vlaming  wrote:

On 04-01-2021 12:16, Hou, Zhijie wrote:


wrt v18-0002patch:

It looks like this introduces a state machine that goes like:
- starts at CTAS_PARALLEL_INS_UNDEF
- possibly moves to CTAS_PARALLEL_INS_SELECT
- CTAS_PARALLEL_INS_TUP_COST_CAN_IGN can be added
- if both were added at some stage, we can go to
CTAS_PARALLEL_INS_TUP_COST_IGNORED and ignore the costs

what i'm wondering is why you opted to put logic around
generate_useful_gather_paths and in cost_gather when to me it seems more
logical to put it in create_gather_path? i'm probably missing something
there?


IMO, The reason is we want to make sure we only ignore the cost when Gather is 
the top node.
And it seems the generate_useful_gather_paths called in 
apply_scanjoin_target_to_paths is the right place which can only create top 
node Gather.
So we change the flag in apply_scanjoin_target_to_paths around 
generate_useful_gather_paths to identify the top node.


Right. We wanted to ignore parallel tuple cost for only the upper Gather path.


I was wondering actually if we need the state machine. Reason is that as
AFAICS the code could be placed in create_gather_path, where you can
also check if it is a top gather node, whether the dest receiver is the
right type, etc? To me that seems like a nicer solution as its makes
that all logic that decides whether or not a parallel CTAS is valid is
in a single place instead of distributed over various places.


IMO, we can't determine the fact that we are going to generate the top
Gather path in create_gather_path. To decide on whether or not the top
Gather path generation, I think it's not only required to check the
root->query_level == 1 but we also need to rely on from where
generate_useful_gather_paths gets called. For instance, for
query_level 1, generate_useful_gather_paths gets called from 2 places
in apply_scanjoin_target_to_paths. Likewise, create_gather_path also
gets called from many places. IMO, the current way i.e. setting flag
it in apply_scanjoin_target_to_paths and ignoring based on that in
cost_gather seems safe.

I may be wrong. Thoughts?

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



So the way I understand it the requirements are:
- it needs to be the top-most gather
- it should not do anything with the rows after the gather node as this 
would make the parallel inserts conceptually invalid.


Right now we're trying to judge what might be added on-top that could 
change the rows by inspecting all parts of the root object that would 
cause anything to be added, and add a little statemachine to track the 
state of that knowledge. To me this has the downside that the list in 
HAS_PARENT_PATH_GENERATING_CLAUSE has to be exhaustive, and we need to 
make sure it stays up-to-date, which could result in regressions if not 
tracked carefully.


Personally I would therefore go for a design which is safe in the sense 
that regressions are not as easily introduced. IMHO that could be done 
by inspecting the planned query afterwards, and then judging whether or 
not the parallel inserts are actually the right thing to do.


Another way to create more safety against regressions would be to add an 
assert upon execution of the query that if we do parallel inserts that 
only a subset of allowed nodes exists above the gather node.


Some (not extremely fact checked) approaches as food for thought:
1. Plan the query as normal, and then afterwards look at the resulting 
plan to see if there are only nodes that are ok between the gather node 
and the top node, which afaics would only be things like append nodes.

Which would mean two things:
- at the end of subquery_planner before the final_rel is fetched, we add 
another pass like the grouping_planner called e.g. 
parallel_modify_planner or so, which traverses the query plan and checks 
if the inserts would indeed be executed parallel, and if so sets the 
cost of the gather to 0.
- we always keep around the best gathered partial path, or the partial 
path itself.


2. Generate both gather paths: one with zero cost for the inserts and 
one with costs. the one with zero costs would however be kept separately 
and added as prime candidate for the final rel. then we can check in the 
subquery_planner if the final candidate is different and then choose.


Kind regards,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-04 Thread Luc Vlaming

On 05-01-2021 04:59, Bharath Rupireddy wrote:

On Mon, Jan 4, 2021 at 7:02 PM Bharath Rupireddy
 wrote:



+   if (IS_PARALLEL_CTAS_DEST(gstate->dest) 
&&
+   ((DR_intorel *) 
gstate->dest)->into->rel &&
+   ((DR_intorel *) 
gstate->dest)->into->rel->relname)
why would rel and relname not be there? if no rows have been inserted?
because it seems from the intorel_startup function that that would be
set as soon as startup was done, which i assume (wrongly?) is always done?


Actually, that into clause rel variable is always being set in the gram.y for 
CTAS, Create Materialized View and SELECT INTO (because qualified_name 
non-terminal is not optional). My bad. I just added it as a sanity check. 
Actually, it's not required.

create_as_target:
 qualified_name opt_column_list table_access_method_clause
 OptWith OnCommitOption OptTableSpace
 {
 $$ = makeNode(IntoClause);
 $$->rel = $1;
create_mv_target:
 qualified_name opt_column_list table_access_method_clause 
opt_reloptions OptTableSpace
 {
 $$ = makeNode(IntoClause);
 $$->rel = $1;
into_clause:
 INTO OptTempTableName
 {
 $$ = makeNode(IntoClause);
$$->rel = $2;

I will change the below code:
+if (GetParallelInsertCmdType(gstate->dest) ==
+PARALLEL_INSERT_CMD_CREATE_TABLE_AS &&
+((DR_intorel *) gstate->dest)->into &&
+((DR_intorel *) gstate->dest)->into->rel &&
+((DR_intorel *) gstate->dest)->into->rel->relname)
+{

to:
+if (GetParallelInsertCmdType(gstate->dest) ==
+PARALLEL_INSERT_CMD_CREATE_TABLE_AS)
+{

I will update this in the next version of the patch set.


Attaching v20 patch set that has above change in 0001 patch, note that
0002 to 0004 patches have no changes from v19. Please consider the v20
patch set for further review.


With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Hi,

Reviewing further v20-0001:

I would still opt for moving the code for the parallel worker into a 
separate function, and then setting rStartup of the dest receiver to 
that function in ExecParallelGetInsReceiver, as its completely 
independent code. Just a matter of style I guess.


Maybe I'm not completely following why but afaics we want parallel 
inserts in various scenarios, not just CTAS? I'm asking because code like

+   if (fpes->ins_cmd_type == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)
+		pg_atomic_add_fetch_u64(>processed, 
queryDesc->estate->es_processed);
seems very specific to CTAS. For now that seems fine but I suppose that 
would be generalized soon after? Basically I would have expected the if 
to compare against PARALLEL_INSERT_CMD_UNDEF.


Apart from these small things v20-0001 looks (very) good to me.

v20-0002:
will reply on the specific mail-thread about the state machine

v20-0003 and v20-0004:
looks good to me.

Kind regards,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-04 Thread Luc Vlaming

On 04-01-2021 14:32, Bharath Rupireddy wrote:
On Mon, Jan 4, 2021 at 4:22 PM Luc Vlaming <mailto:l...@swarm64.com>> wrote:

 > Sorry it took so long to get back to reviewing this.

Thanks for the comments.

 > wrt v18-0001patch:
 >
 > +               /*
 > +                * If the worker is for parallel insert in CTAS, then 
use the proper

 > +                * dest receiver.
 > +                */
 > +               intoclause = (IntoClause *) stringToNode(intoclausestr);
 > +               receiver = CreateIntoRelDestReceiver(intoclause);
 > +               ((DR_intorel *)receiver)->is_parallel_worker = true;
 > +               ((DR_intorel *)receiver)->object_id = fpes->objectid;
 > I would move this into a function called e.g.
 > GetCTASParallelWorkerReceiver so that the details wrt CTAS can be put in
 > createas.c.
 > I would then also split up intorel_startup into intorel_leader_startup
 > and intorel_worker_startup, and in GetCTASParallelWorkerReceiver set
 > self->pub.rStartup to intorel_worker_startup.

My intention was to not add any new APIs to the dest receiver. I simply 
made the changes in intorel_startup, in which for workers it just does 
the minimalistic work and exit from it. In the leader most of the table 
creation and sanity check is kept untouched. Please have a look at the 
v19 patch posted upthread [1].




Looks much better, really nicely abstracted away in the v20 patch.


 > +       volatile pg_atomic_uint64       *processed;
 > why is it volatile?

Intention is to always read from the actual memory location. I referred 
it from the way pg_atomic_fetch_add_u64_impl, 
pg_atomic_compare_exchange_u64_impl, pg_atomic_init_u64_impl and their 
u32 counterparts use pass the parameter as volatile pg_atomic_uint64 *ptr.




Okay I had not seen this syntax before for atomics with the volatile 
keyword but its apparently how the atomics abstraction works in postgresql.



 > +                       if (isctas)
 > +                       {
 > +                               intoclause = ((DR_intorel *) 
node->dest)->into;
 > +                               objectid = ((DR_intorel *) 
node->dest)->object_id;

 > +                       }
 > Given that you extract them each once and then pass them directly into
 > the parallel-worker, can't you instead pass in the destreceiver and
 > leave that logic to ExecInitParallelPlan?

That's changed entirely in the v19 patch set posted upthread [1]. Please 
have a look. I didn't pass the dest receiver, to keep the API generic, I 
passed parallel insert command type and a void * ptr which points to 
insertion command because the information we pass to workers depends on 
the insertion command (for instance, the information needed by workers 
is for CTAS into clause and object id and for Refresh Mat View object id).


 >
 > +                                       if 
(IS_PARALLEL_CTAS_DEST(gstate->dest) &&
 > +                                               ((DR_intorel *) 
gstate->dest)->into->rel &&
 > +                                               ((DR_intorel *) 
gstate->dest)->into->rel->relname)

 > why would rel and relname not be there? if no rows have been inserted?
 > because it seems from the intorel_startup function that that would be
 > set as soon as startup was done, which i assume (wrongly?) is always 
done?


Actually, that into clause rel variable is always being set in the 
gram.y for CTAS, Create Materialized View and SELECT INTO (because 
qualified_name non-terminal is not optional). My bad. I just added it as 
a sanity check. Actually, it's not required.


create_as_target:
*qualified_name* opt_column_list table_access_method_clause
             OptWith OnCommitOption OptTableSpace
                 {
                     $$ = makeNode(IntoClause);
*                    $$->rel = $1;*
create_mv_target:
*qualified_name* opt_column_list table_access_method_clause 
opt_reloptions OptTableSpace

     {
     $$ = makeNode(IntoClause);
*    $$->rel = $1;*
into_clause:
     INTO OptTempTableName
     {
     $$ = makeNode(IntoClause);
*   $$->rel = $2;*

I will change the below code:
+                    if (GetParallelInsertCmdType(gstate->dest) ==
+                        PARALLEL_INSERT_CMD_CREATE_TABLE_AS &&
+                        ((DR_intorel *) gstate->dest)->into &&
+                        ((DR_intorel *) gstate->dest)->into->rel &&
+                        ((DR_intorel *) gstate->dest)->into->rel->relname)
+                    {

to:
+                    if (GetParallelInsertCmdType(gstate->dest) ==
+                        PARALLEL_INSERT_CMD_CREATE_TABLE_AS)
+                    {

I will update this in the next v

Re: Parallel Inserts in CREATE TABLE AS

2021-01-04 Thread Luc Vlaming

On 04-01-2021 12:16, Hou, Zhijie wrote:

Hi



wrt v18-0002patch:

It looks like this introduces a state machine that goes like:
- starts at CTAS_PARALLEL_INS_UNDEF
- possibly moves to CTAS_PARALLEL_INS_SELECT
- CTAS_PARALLEL_INS_TUP_COST_CAN_IGN can be added
- if both were added at some stage, we can go to
CTAS_PARALLEL_INS_TUP_COST_IGNORED and ignore the costs

what i'm wondering is why you opted to put logic around
generate_useful_gather_paths and in cost_gather when to me it seems more
logical to put it in create_gather_path? i'm probably missing something
there?


IMO, The reason is we want to make sure we only ignore the cost when Gather is 
the top node.
And it seems the generate_useful_gather_paths called in 
apply_scanjoin_target_to_paths is the right place which can only create top 
node Gather.
So we change the flag in apply_scanjoin_target_to_paths around 
generate_useful_gather_paths to identify the top node.


Best regards,
houzj




Hi,

I was wondering actually if we need the state machine. Reason is that as 
AFAICS the code could be placed in create_gather_path, where you can 
also check if it is a top gather node, whether the dest receiver is the 
right type, etc? To me that seems like a nicer solution as its makes 
that all logic that decides whether or not a parallel CTAS is valid is 
in a single place instead of distributed over various places.


Kind regards,
Luc




Re: Parallel Inserts in CREATE TABLE AS

2021-01-04 Thread Luc Vlaming

On 30-12-2020 04:55, Bharath Rupireddy wrote:

On Wed, Dec 30, 2020 at 5:22 AM Zhihong Yu  wrote:

w.r.t. v17-0004-Enable-CTAS-Parallel-Inserts-For-Append.patch

+ * Push the dest receiver to Gather node when it is either at the top of the
+ * plan or under top Append node unless it does not have any projections to do.

I think the 'unless' should be 'if'. As can be seen from the body of the method:

+   if (!ps->ps_ProjInfo)
+   {
+   GatherState *gstate = (GatherState *) ps;
+
+   parallel = true;


Thanks. Modified it in the 0004 patch. Attaching v18 patch set. Note
that no change in 0001 to 0003 patches from v17.

Please consider v18 patch set for further review.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Hi,

Sorry it took so long to get back to reviewing this.

wrt v18-0001patch:

+   /*
+* If the worker is for parallel insert in CTAS, then use the 
proper
+* dest receiver.
+*/
+   intoclause = (IntoClause *) stringToNode(intoclausestr);
+   receiver = CreateIntoRelDestReceiver(intoclause);
+   ((DR_intorel *)receiver)->is_parallel_worker = true;
+   ((DR_intorel *)receiver)->object_id = fpes->objectid;
I would move this into a function called e.g. 
GetCTASParallelWorkerReceiver so that the details wrt CTAS can be put in 
createas.c.

I would then also split up intorel_startup into intorel_leader_startup
and intorel_worker_startup, and in GetCTASParallelWorkerReceiver set 
self->pub.rStartup to intorel_worker_startup.



+   volatile pg_atomic_uint64   *processed;
why is it volatile?


+   if (isctas)
+   {
+   intoclause = ((DR_intorel *) node->dest)->into;
+   objectid = ((DR_intorel *) 
node->dest)->object_id;
+   }
Given that you extract them each once and then pass them directly into 
the parallel-worker, can't you instead pass in the destreceiver and 
leave that logic to ExecInitParallelPlan?



+   if (IS_PARALLEL_CTAS_DEST(gstate->dest) 
&&
+   ((DR_intorel *) 
gstate->dest)->into->rel &&
+   ((DR_intorel *) 
gstate->dest)->into->rel->relname)
why would rel and relname not be there? if no rows have been inserted? 
because it seems from the intorel_startup function that that would be 
set as soon as startup was done, which i assume (wrongly?) is always done?



+* In case if no workers were launched, allow the leader to insert 
entire
+* tuples.
what does "entire tuples" mean? should it maybe be "all tuples"?



wrt v18-0002patch:

It looks like this introduces a state machine that goes like:
- starts at CTAS_PARALLEL_INS_UNDEF
- possibly moves to CTAS_PARALLEL_INS_SELECT
- CTAS_PARALLEL_INS_TUP_COST_CAN_IGN can be added
- if both were added at some stage, we can go to 
CTAS_PARALLEL_INS_TUP_COST_IGNORED and ignore the costs


what i'm wondering is why you opted to put logic around 
generate_useful_gather_paths and in cost_gather when to me it seems more 
logical to put it in create_gather_path? i'm probably missing something 
there?




wrt v18-0003patch:

not sure if it is needed, but i was wondering if we would want more 
tests with multiple gather nodes existing? caused e.g. by using CTE's, 
valid subquery's (like the one test you have, but without the group 
by/having)?



Kind regards,
Luc




Re: Consider Parallelism While Planning For REFRESH MATERIALIZED VIEW

2021-01-04 Thread Luc Vlaming
The following review has been posted through the commitfest application:
make installcheck-world:  tested, passed
Implements feature:   tested, passed
Spec compliant:   not tested
Documentation:not tested

passes according to http://cfbot.cputube.org/

The new status of this patch is: Ready for Committer


Re: Consider Parallelism While Planning For REFRESH MATERIALIZED VIEW

2021-01-04 Thread Luc Vlaming

On 30-12-2020 04:49, Bharath Rupireddy wrote:

On Wed, Dec 30, 2020 at 8:03 AM Hou, Zhijie  wrote:

Yeah without explain analyze we can not show whether the parallelism is
picked in the test cases. What we could do is that we can add a plain RMV
test case in write_parallel.sql after CMV so that at least we can be ensured
that the parallelism will be picked because of the enforcement there. We
can always see the parallelism for the select part of explain analyze CMV
in write_parallel.sql and the same select query gets planned even in RMV
cases.

IMO, the patch in this thread can go with test case addition to
write_parallel.sql. since it is very small.

Thoughts?


Yes, agreed.


Thanks. Added the test case. Attaching v2 patch. Please have a look.


With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Hi,

Looks good to me and a nice simple improvement.

Passes everything according to http://cfbot.cputube.org/ so marked it 
therefore as ready for commiter.


Cheers,
Luc




Re: New Table Access Methods for Multi and Single Inserts

2021-01-03 Thread Luc Vlaming

On 28-12-2020 13:48, Bharath Rupireddy wrote:

On Fri, Dec 25, 2020 at 8:10 AM Justin Pryzby  wrote:

On Thu, Dec 24, 2020 at 05:48:42AM +0530, Bharath Rupireddy wrote:

I'm not posting the updated 0002 to 0004 patches, I plan to do so
after a couple of reviews happen on the design of the APIs in 0001.

Thoughts?


Are you familiar with this work ?

https://commitfest.postgresql.org/31/2717/
Reloptions for table access methods

It seems like that can be relevant for your patch, and I think some of what
your patch needs might be provided by AM opts.

It's difficult to generalize AMs when we have only one, but your use-case might
be a concrete example which would help to answer some questions on the other
thread.

@Jeff: https://commitfest.postgresql.org/31/2871/


Note that I have not gone through the entire thread at [1]. On some
initial study, that patch is proposing to allow different table AMs to
have custom rel options.

In the v2 patch that I sent upthread [2] for new table AMs has heap AM
multi insert code moved inside the new heap AM implementation and I
don't see any need of having rel options. In case, any other AMs want
to have the control for their multi insert API implementation via rel
options, I think the proposal at [1] can be useful.


Thoughts?

[1] - https://commitfest.postgresql.org/31/2717/
[2] - 
https://www.postgresql.org/message-id/CALj2ACWMnZZCu%3DG0PJkEeYYicKeuJ-X%3DSU19i6vQ1%2B%3DuXz8u0Q%40mail.gmail.com

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com


Hi,

> IIUC, there's no dependency or anything as such for the new table AM
> patch with the rel options thread [1]. If I'm right, can this new
> table AM patch [2] be reviewed further?

To me this seems good enough. Reason is that I anticipate that there 
would not necessarily be per-table options for now but rather global 
options, if any. Moreover, if we want to make these kind of tradeoffs 
user-controllable I would argue this should be done in a different 
patch-set either way. Reason is that there are parameters in heap 
already that are computed / hardcoded as well (see e.g. 
RelationAddExtraBlocks).


===

As to the patches themselves:

I think the API is a huge step forward! I assume that we want to have a 
single-insert API like heap_insert_v2 so that we can encode the 
knowledge that there will just be a single insert coming and likely a 
commit afterwards?


Reason I'm asking is that I quite liked the heap_insert_begin parameter 
is_multi, which could even be turned into a "expected_rowcount" of the 
amount of rows expected to be commited in the transaction (e.g. single, 
several, thousands/stream).
If we were to make the API based on expected rowcounts, the whole 
heap_insert_v2, heap_insert and heap_multi_insert could be turned into a 
single function heap_insert, as the knowledge about buffering of the 
slots is then already stored in the TableInsertState, creating an API like:


// expectedRows: -1 = streaming, otherwise expected rowcount.
TableInsertState* heap_insert_begin(Relation rel, CommandId cid, int 
options, int expectedRows);

heap_insert(TableInsertState *state, TupleTableSlot *slot);

Do you think that's a good idea?

Two smaller things I'm wondering:
- the clear_mi_slots; why is this not in the HeapMultiInsertState? the 
slots themselves are declared there? also, the boolean themselves is 
somewhat problematic I think because it would only work if you specified 
is_multi=true which would depend on the actual tableam to implement this 
then in a way that copy/ctas/etc can also use the slot properly, which I 
think would severely limit their freedom to store the slots more 
efficiently? Also, why do we want to do ExecClearTuple() anyway? Isn't 
it good enough that the next call to ExecCopySlot will effectively clear 
it out?
- flushed -> why is this a stored boolean? isn't this indirectly encoded 
by cur_slots/cur_size == 0?


For patches 02-04 I quickly skimmed through them as I assume we first 
want the API agreed upon. Generally they look nice and like a big step 
forward. What I'm just wondering about is the usage of the 
implementation details like mistate->slots[X]. It makes a lot of sense 
to do so but also makes for a difficult compromise, because now the 
tableam has to guarantee a copy of the slot, and hopefully even one in a 
somewhat efficient form.


Kind regards,
Luc




Re: Lazy JIT IR code generation to increase JIT speed with partitions

2021-01-03 Thread Luc Vlaming

On 30-12-2020 14:23, Luc Vlaming wrote:

On 30-12-2020 02:57, Andres Freund wrote:

Hi,

Great to see work in this area!

On 2020-12-28 09:44:26 +0100, Luc Vlaming wrote:
I would like to propose a small patch to the JIT machinery which 
makes the
IR code generation lazy. The reason for postponing the generation of 
the IR

code is that with partitions we get an explosion in the number of JIT
functions generated as many child tables are involved, each with 
their own
JITted functions, especially when e.g. partition-aware 
joins/aggregates are
enabled. However, only a fraction of those functions is actually 
executed
because the Parallel Append node distributes the workers among the 
nodes.
With the attached patch we get a lazy generation which makes that 
this is no

longer a problem.


I unfortunately don't think this is quite good enough, because it'll
lead to emitting all functions separately, which can also lead to very
substantial increases of the required time (as emitting code is an
expensive step). Obviously that is only relevant in the cases where the
generated functions actually end up being used - which isn't the case in
your example.

If you e.g. look at a query like
   SELECT blub, count(*),sum(zap) FROM foo WHERE blarg = 3 GROUP BY blub;
on a table without indexes, you would end up with functions for

- WHERE clause (including deforming)
- projection (including deforming)
- grouping key
- aggregate transition
- aggregate result projection

with your patch each of these would be emitted separately, instead of
one go. Which IIRC increases the required time by a significant amount,
especially if inlining is done (where each separate code generation ends
up with copies of the inlined code).


As far as I can see you've basically falsified the second part of this
comment (which you moved):


+
+    /*
+ * Don't immediately emit nor actually generate the function.
+ * instead do so the first time the expression is actually 
evaluated.
+ * That allows to emit a lot of functions together, avoiding a 
lot of
+ * repeated llvm and memory remapping overhead. It also helps 
with not
+ * compiling functions that will never be evaluated, as can be 
the case
+ * if e.g. a parallel append node is distributing workers 
between its

+ * child nodes.
+ */



-    /*
- * Don't immediately emit function, instead do so the first time 
the

- * expression is actually evaluated. That allows to emit a lot of
- * functions together, avoiding a lot of repeated llvm and memory
- * remapping overhead.
- */


Greetings,

Andres Freund



Hi,

Happy to help out, and thanks for the info and suggestions.
Also, I should have first searched psql-hackers and the like, as I just 
found out there is already discussions about this in [1] and [2].
However I think the approach I took can be taken independently and then 
other solutions could be added on top.


Assuming I understood all suggestions correctly, the ideas so far are:
1. add a LLVMAddMergeFunctionsPass so that duplicate code is removed and 
not optimized several times (see [1]). Requires all code to be emitted 
in the same module.

2. JIT only parts of the plan, based on cost (see [2]).
3. Cache compilation results to avoid recompilation. this would either 
need a shm capable optimized IR cache or would not work with parallel 
workers.

4. Lazily jitting (this patch)

An idea that might not have been presented in the mailing list yet(?):
5. Only JIT in nodes that process a certain amount of rows. Assuming 
there is a constant overhead for JITting and the goal is to gain runtime.


Going forward I would first try to see if my current approach can work 
out. The only idea that would be counterproductive to my solution would 
be solution 1. Afterwards I'd like to continue with either solution 2, 
5, or 3 in the hopes that we can reduce JIT overhead to a minimum and 
can therefore apply it more broadly.


To test out why and where the JIT performance decreased with my solution 
I improved the test script and added various queries to model some of 
the cases I think we should care about. I have not (yet) done big scale 
benchmarks as these queries seemed to already show enough problems for 
now. Now there are 4 queries which test JITting with/without partitions, 
and with varying amounts of workers and rowcounts. I hope these are 
indeed a somewhat representative set of queries.


As pointed out the current patch does create a degradation in 
performance wrt queries that are not partitioned (basically q3 and q4). 
After looking into those queries I noticed two things:
- q3 is very noisy wrt JIT timings. This seems to be the result of 
something wrt parallel workers starting up the JITting and creating very 
high amounts of noise (e.g. inlining timings varying between 3.8s and 6.2s)

- q4 seems very stable with JIT timings (after the first run).
I'm wondering if this could mean that with parallel workers quite a lot 
of time

Re: faster ETL / bulk data load for heap tables

2021-01-02 Thread Luc Vlaming

On 01-01-2021 19:55, Zhihong Yu wrote:

Hi, Luc:
Happy New Year.

Looking at BufferAllocExtend() 
in v1-0002-WIP-buffer-alloc-specialized-for-relation-extensi.patch. it 
seems there is duplicate code with the existing BufferAlloc().


It would be good if some refactoring is done by extracting common code 
into a helper function.


Thanks



Hi,

Thanks! Happy new year to you too!

Thanks for your suggestion. I would wait a bit and first get some 
feedback on the design/approach of my patches before doing the 
refactoring. The current code is very much a WIP where I just copied 
functions to be able to make specialized variants of them aimed at bulk 
extension and then benchmark those. If the refactoring needs to be done 
before I can get feedback on the design / approach then let me know.


Kind regards,
Luc




Re: faster ETL / bulk data load for heap tables

2021-01-02 Thread Luc Vlaming

On 02-01-2021 08:36, Amit Kapila wrote:

On Fri, Jan 1, 2021 at 7:37 PM Luc Vlaming  wrote:


Hi,

In an effort to speed up bulk data loading/transforming I noticed that
considerable time is spent in the relation extension lock.



We already do extend the relation in bulk when there is a contention
on relation extension lock via RelationAddExtraBlocks. I wonder why is
that not able to address this kind of workload. On a quick look at
your patch, it seems you are always trying to extend the relation by
128 blocks for copy operation after acquiring the lock whereas the
current mechanism has some smarts where it decides based on the number
of waiters. Now, it is possible that we should extend by a larger
number of blocks as compared to what we are doing now but using some
ad-hoc number might lead to space wastage. Have you tried to fiddle
with the current scheme of bulk-extension to see if that addresses
some of the gains you are seeing? I see that you have made quite a few
other changes that might be helping here but still, it is better to
see how much bottleneck is for relation extension lock and if that can
be addressed with the current mechanism rather than changing the
things in a different way.



Hi,

Thanks for looking at the patch!

Yes I tried that. I guess I should have also shared all other things I 
have tried before I ended up with these patches.
I've tried to improve RelationAddExtraBlocks to extend the mechanism to 
more aggresively allocate blocks, to not put them in the FSM immediately 
as this also seemed like a point of contention, extending ReadBufferBI 
to allocate several pages in a loop, and a few more variants like this.
To be sure I just tried again a few variants where I made e.g. 
extraBlocks=128, 256, etc, and disabled e.g. the FSM code. None of those 
grant very big performance gains or actually make it slower, suggesting 
the code is parameterized quite well already for the current design.


The main problem is that the relation extension lock is taken to extend 
one block at a time, whilst doing (expensive) syscalls like pwrite(). 
Even though we then put these blocks immediately in the FSM and such, 
the bottleneck stays the extension of the file itself, no matter how 
many blocks we allocate in a loop in RelationAddExtraBlocks.


Therefore what I've set out to do is:
- make the relation extension lock taken as short as possible.
- reduce the time spent on syscalls as much as possible.

This resulted in a design which hands out blocks to a specific backend 
so that everything after the file extension can be done safely without 
locks. Given that the current API did not allow this to be specified 
properly for now I added extra functions which have just as purpose to 
extend the relation so that they could be purpose built for this. I did 
not want to suggest this is however the best API there could be. The 
current state of this patch is in that sense still very crude. I just 
wrote the code I needed to be able to do performance testing. So it is 
as you rightfully pointed out quite ad-hoc and not very generic, nor the 
right design, has quite some code duplication, etc.


I was at this point mostly looking for feedback on the design/approach. 
If this requires a (much) cleaner and more productified patch then I can 
arrange that. However I thought to first find out if this approach makes 
sense at all before doing more testing to make this more generic.


Kind regards,
Luc




faster ETL / bulk data load for heap tables

2021-01-01 Thread Luc Vlaming
#49fe9f2ffcc9916cc5ed3a712aa5f28f
[2] 
https://www.postgresql.org/message-id/flat/CALj2ACWjymmyTvvhU20Er-LPLaBjzBQxMJwr4nzO7XWmOkxhsg%40mail.gmail.com#be34b5b7861876fc0fd7edb621c067fa
[3] 
https://www.postgresql.org/message-id/flat/CALj2ACXg-4hNKJC6nFnepRHYT4t5jJVstYvri%2BtKQHy7ydcr8A%40mail.gmail.com
[4] 
https://www.postgresql.org/message-id/flat/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
[5] 
https://www.postgresql.org/message-id/flat/CALDaNm3GaZyYPpGu-PpF0SEkJg-eaW3TboHxpxJ-2criv2j_eA%40mail.gmail.com#07292ce654ef58fae7f257a4e36afc41
[6] 
https://www.postgresql.org/message-id/flat/20200203132319.x7my43whtefeznz7%40alap3.anarazel.de#85a2a0ab915cdf079862d70505abe3db
[7] 
https://www.postgresql.org/message-id/flat/20201208040227.7rlzpfpdxoau4pvd%40alap3.anarazel.de#b8ea4a3b7f37e88ddfe121c4b3075e7b
[8] 
https://www.postgresql.org/message-id/flat/CAD21AoA9oK1VOoUuKW-jEO%3DY2nt5kCQKKFgeQwwRUMoh6BE-ow%40mail.gmail.com#0475248a5ff7aed735be41fd4034ae36
[9] 
https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BHf_R_ih1pkBMTWn%3DSTyKMOM2Ks47Y_UqqfU1wRc1VvA%40mail.gmail.com#7a53ad72331e423ba3c6a50e6dc1259f
>From b08074c5141967a8e6e805fd9c15c1a0450af2f7 Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Thu, 31 Dec 2020 09:11:54 +0100
Subject: [PATCH v1 1/2] WIP: local bulk allocation

---
 src/backend/access/heap/heapam.c  |  12 ++
 src/backend/access/heap/hio.c | 178 ++
 src/backend/access/heap/rewriteheap.c |   2 +-
 src/backend/storage/buffer/bufmgr.c   |  60 -
 src/backend/storage/file/buffile.c|   3 +-
 src/backend/storage/file/fd.c |  16 ++-
 src/backend/storage/smgr/md.c |  56 
 src/backend/storage/smgr/smgr.c   |  15 ++-
 src/include/access/hio.h  |   4 +
 src/include/storage/bufmgr.h  |   2 +
 src/include/storage/fd.h  |   2 +-
 src/include/storage/md.h  |   2 +-
 src/include/storage/smgr.h|   2 +
 13 files changed, 183 insertions(+), 171 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 26c2006f23..0a923c1ffd 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1810,6 +1810,10 @@ GetBulkInsertState(void)
 	bistate = (BulkInsertState) palloc(sizeof(BulkInsertStateData));
 	bistate->strategy = GetAccessStrategy(BAS_BULKWRITE);
 	bistate->current_buf = InvalidBuffer;
+	bistate->local_buffers_idx = BULK_INSERT_BATCH_SIZE;
+	for (int i=0; ilocal_buffers[i] = InvalidBuffer;
+	bistate->empty_buffer = palloc0(BULK_INSERT_BATCH_SIZE * BLCKSZ);
 	return bistate;
 }
 
@@ -1821,7 +1825,15 @@ FreeBulkInsertState(BulkInsertState bistate)
 {
 	if (bistate->current_buf != InvalidBuffer)
 		ReleaseBuffer(bistate->current_buf);
+	for (int i=bistate->local_buffers_idx; ilocal_buffers[i] != InvalidBuffer)
+		{
+			// FSM?
+			//LockBuffer(bistate->local_buffers[i], BUFFER_LOCK_UNLOCK);
+			ReleaseBuffer(bistate->local_buffers[i]);
+		}
 	FreeAccessStrategy(bistate->strategy);
+	pfree(bistate->empty_buffer);
 	pfree(bistate);
 }
 
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index ca357410a2..9111badd2f 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -24,6 +24,7 @@
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 
+#include "miscadmin.h"
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -118,9 +119,19 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
 		bistate->current_buf = InvalidBuffer;
 	}
 
-	/* Perform a read using the buffer strategy */
-	buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
-mode, bistate->strategy);
+	if (targetBlock == P_NEW && mode == RBM_ZERO_AND_LOCK && bistate->local_buffers_idx < BULK_INSERT_BATCH_SIZE)
+	{
+		/* If we have a local buffer remaining, use that */
+		buffer = bistate->local_buffers[bistate->local_buffers_idx++];
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		Assert(buffer != InvalidBuffer);
+	}
+	else
+	{
+		/* Perform a read using the buffer strategy */
+		buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
+	mode, bistate->strategy);
+	}
 
 	/* Save the selected block as target for future inserts */
 	IncrBufferRefCount(buffer);
@@ -187,90 +198,6 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
 	}
 }
 
-/*
- * Extend a relation by multiple blocks to avoid future contention on the
- * relation extension lock.  Our goal is to pre-extend the relation by an
- * amount which ramps up as the degree of contention ramps up, but limiting
- * the result to some sane overall value.
- */
-static void
-RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
-{
-	BlockNumber blockNum,
-firstBlock = InvalidBlockNumber;
-	int			extraBlocks;
-	int			lockWaiters;
-
-	/* Use the length 

Re: allow partial union-all and improve parallel subquery costing

2020-12-30 Thread Luc Vlaming

On 23-10-2020 07:51, Luc Vlaming wrote:

On 14.10.20 09:38, Luc Vlaming wrote:

Hi,

It seems I ran the wrong make checks to verify everything is correct 
(make check instead
of make installcheck-world) and this uncovered another regress test 
change. I also noticed
the statistics are sometimes giving different row count results so I 
increased the row
statistics target to make sure the regress output is stable. Updated 
patch attached which

now successfully runs installcheck-world for v13 and master.

Kind regards,
Luc


From: Luc Vlaming 
Sent: Tuesday, October 13, 2020 10:57 AM
To: pgsql-hackers
Subject: allow partial union-all and improve parallel subquery costing

Hi,

While developing some improvements for TPC-DS queries I found out that 
with
UNION ALL partial paths are not emitted. Whilst fixing that I also 
came across
the subquery costing which does not seem to consider parallelism when 
doing

the costing.

I added a simplified testcase in pg-regress to show this goes wrong, and
attached also a before and after explain output of tpc-ds SF100 query 5
based on version 12.4.

I hope I followed all etiquette and these kind of improvements are 
welcome.


Kind regards,
Luc
Swarm64



Hi,

Created a commitfest entry assuming this is the right thing to do so 
that someone can potentially pick it up during the commitfest.


Kind regards,
Luc
Swarm64


Hi,

Providing an updated patch based on latest master.

Cheers,
Luc
>From 032c48b51ee0d436c91d9db81ce800d91168bd01 Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Wed, 30 Dec 2020 14:49:48 +0100
Subject: [PATCH v3] Allow partial UNION ALL; improve parallel subquery costing

---
 src/backend/optimizer/path/costsize.c | 11 
 src/backend/optimizer/prep/prepunion.c|  4 ++
 .../regress/expected/incremental_sort.out | 10 ++--
 src/test/regress/expected/union.out   | 52 +++
 src/test/regress/sql/union.sql| 37 +
 5 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 22d6935824..1c3b04c4d7 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1328,6 +1328,17 @@ cost_subqueryscan(SubqueryScanPath *path, PlannerInfo *root,
 	startup_cost += path->path.pathtarget->cost.startup;
 	run_cost += path->path.pathtarget->cost.per_tuple * path->path.rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->path.parallel_workers > 0)
+	{
+		double  parallel_divisor = get_parallel_divisor(>path);
+
+		path->path.rows = clamp_row_est(path->path.rows / parallel_divisor);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+	}
+
 	path->path.startup_cost += startup_cost;
 	path->path.total_cost += startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 745f443e5c..4001eb87f3 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -678,6 +678,10 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 			   NIL, NULL,
 			   parallel_workers, enable_parallel_append,
 			   NIL, -1);
+
+		if (op->all && enable_parallel_append)
+			add_partial_path(result_rel, ppath);
+
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 			   result_rel->reltarget, NULL, NULL);
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index a8cbfd9f5f..40265674c4 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1459,14 +1459,12 @@ explain (costs off) select * from t union select * from t order by 1,3;
->  Unique
  ->  Sort
Sort Key: t.a, t.b, t.c
-   ->  Append
- ->  Gather
-   Workers Planned: 2
+   ->  Gather
+ Workers Planned: 2
+ ->  Parallel Append
->  Parallel Seq Scan on t
- ->  Gather
-   Workers Planned: 2
->  Parallel Seq Scan on t t_1
-(13 rows)
+(11 rows)
 
 -- Full sort, not just incremental sort can be pushed below a gather merge path
 -- by generate_useful_gather_paths.
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 75f78db8f5..cf7660f524 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1420,3 +1420,55 @@ where (x = 0) or (q1 >= q2 and q1 <= q2);
  4567890123456789 |  4567890123456789 | 1
 (6 rows)
 
+-- Test handling of appendrel with different types which disables the path flattening 

Re: Lazy JIT IR code generation to increase JIT speed with partitions

2020-12-30 Thread Luc Vlaming

On 30-12-2020 02:57, Andres Freund wrote:

Hi,

Great to see work in this area!

On 2020-12-28 09:44:26 +0100, Luc Vlaming wrote:

I would like to propose a small patch to the JIT machinery which makes the
IR code generation lazy. The reason for postponing the generation of the IR
code is that with partitions we get an explosion in the number of JIT
functions generated as many child tables are involved, each with their own
JITted functions, especially when e.g. partition-aware joins/aggregates are
enabled. However, only a fraction of those functions is actually executed
because the Parallel Append node distributes the workers among the nodes.
With the attached patch we get a lazy generation which makes that this is no
longer a problem.


I unfortunately don't think this is quite good enough, because it'll
lead to emitting all functions separately, which can also lead to very
substantial increases of the required time (as emitting code is an
expensive step). Obviously that is only relevant in the cases where the
generated functions actually end up being used - which isn't the case in
your example.

If you e.g. look at a query like
   SELECT blub, count(*),sum(zap) FROM foo WHERE blarg = 3 GROUP BY blub;
on a table without indexes, you would end up with functions for

- WHERE clause (including deforming)
- projection (including deforming)
- grouping key
- aggregate transition
- aggregate result projection

with your patch each of these would be emitted separately, instead of
one go. Which IIRC increases the required time by a significant amount,
especially if inlining is done (where each separate code generation ends
up with copies of the inlined code).


As far as I can see you've basically falsified the second part of this
comment (which you moved):


+
+   /*
+* Don't immediately emit nor actually generate the function.
+* instead do so the first time the expression is actually evaluated.
+* That allows to emit a lot of functions together, avoiding a lot of
+* repeated llvm and memory remapping overhead. It also helps with not
+* compiling functions that will never be evaluated, as can be the case
+* if e.g. a parallel append node is distributing workers between its
+* child nodes.
+*/



-   /*
-* Don't immediately emit function, instead do so the first time the
-* expression is actually evaluated. That allows to emit a lot of
-* functions together, avoiding a lot of repeated llvm and memory
-* remapping overhead.
-*/


Greetings,

Andres Freund



Hi,

Happy to help out, and thanks for the info and suggestions.
Also, I should have first searched psql-hackers and the like, as I just 
found out there is already discussions about this in [1] and [2].
However I think the approach I took can be taken independently and then 
other solutions could be added on top.


Assuming I understood all suggestions correctly, the ideas so far are:
1. add a LLVMAddMergeFunctionsPass so that duplicate code is removed and 
not optimized several times (see [1]). Requires all code to be emitted 
in the same module.

2. JIT only parts of the plan, based on cost (see [2]).
3. Cache compilation results to avoid recompilation. this would either 
need a shm capable optimized IR cache or would not work with parallel 
workers.

4. Lazily jitting (this patch)

An idea that might not have been presented in the mailing list yet(?):
5. Only JIT in nodes that process a certain amount of rows. Assuming 
there is a constant overhead for JITting and the goal is to gain runtime.


Going forward I would first try to see if my current approach can work 
out. The only idea that would be counterproductive to my solution would 
be solution 1. Afterwards I'd like to continue with either solution 2, 
5, or 3 in the hopes that we can reduce JIT overhead to a minimum and 
can therefore apply it more broadly.


To test out why and where the JIT performance decreased with my solution 
I improved the test script and added various queries to model some of 
the cases I think we should care about. I have not (yet) done big scale 
benchmarks as these queries seemed to already show enough problems for 
now. Now there are 4 queries which test JITting with/without partitions, 
and with varying amounts of workers and rowcounts. I hope these are 
indeed a somewhat representative set of queries.


As pointed out the current patch does create a degradation in 
performance wrt queries that are not partitioned (basically q3 and q4). 
After looking into those queries I noticed two things:
- q3 is very noisy wrt JIT timings. This seems to be the result of 
something wrt parallel workers starting up the JITting and creating very 
high amounts of noise (e.g. inlining timings varying between 3.8s and 6.2s)

- q4 seems very stable with JIT timings (after the first run).
I'm wondering if this could mean that with parallel workers quite a lot 
of time is spent

Lazy JIT IR code generation to increase JIT speed with partitions

2020-12-28 Thread Luc Vlaming

Hi,

I would like to propose a small patch to the JIT machinery which makes 
the IR code generation lazy. The reason for postponing the generation of 
the IR code is that with partitions we get an explosion in the number of 
JIT functions generated as many child tables are involved, each with 
their own JITted functions, especially when e.g. partition-aware 
joins/aggregates are enabled. However, only a fraction of those 
functions is actually executed because the Parallel Append node 
distributes the workers among the nodes. With the attached patch we get 
a lazy generation which makes that this is no longer a problem.


For benchmarks I have in TPC-H and TPC-DS like queries with partitioning 
by hash seen query runtimes increase by 20+ seconds even on the simpler 
queries. Also I created a small benchmark to reproduce the case easily 
(see attached sql file):


without patch, using 7 launched workers:
- without jit: ~220ms
- with jit: ~1880ms
without patch, using 50 launched workers:
- without jit: ~280ms
- with jit: ~3400ms

with patch, using 7 launched workers:
- without jit: ~220ms
- with jit: ~590ms

with patch, using 50 launched workers:
- without jit: ~280ms
- with jit: ~530ms

Thoughts?

With Regards,
Luc Vlaming
Swarm64


jit_partitions.sql
Description: application/sql
>From 5dd5df7e29bb3262b8f7f02c4fd3896bb34c3133 Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Mon, 28 Dec 2020 09:01:32 +0100
Subject: [PATCH v1] generate JIT IR code lazily

---
 src/backend/jit/llvm/llvmjit_expr.c | 98 +
 1 file changed, 59 insertions(+), 39 deletions(-)

diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index 3aa08a9743..2ac79b7571 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -52,6 +52,7 @@ typedef struct CompiledExprState
 } CompiledExprState;
 
 
+static Datum ExecCompileExpr(ExprState *state, ExprContext *econtext, bool *isNull);
 static Datum ExecRunCompiledExpr(ExprState *state, ExprContext *econtext, bool *isNull);
 
 static LLVMValueRef BuildV1Call(LLVMJitContext *context, LLVMBuilderRef b,
@@ -70,18 +71,66 @@ static LLVMValueRef create_LifetimeEnd(LLVMModuleRef mod);
 	   lengthof(((LLVMValueRef[]){__VA_ARGS__})), \
 	   ((LLVMValueRef[]){__VA_ARGS__}))
 
-
 /*
- * JIT compile expression.
+ * Prepare the JIT compile expression.
  */
 bool
 llvm_compile_expr(ExprState *state)
 {
 	PlanState  *parent = state->parent;
-	char	   *funcname;
-
 	LLVMJitContext *context = NULL;
 
+
+	/*
+	 * Right now we don't support compiling expressions without a parent, as
+	 * we need access to the EState.
+	 */
+	Assert(parent);
+
+	llvm_enter_fatal_on_oom();
+
+	/* get or create JIT context */
+	if (parent->state->es_jit)
+		context = (LLVMJitContext *) parent->state->es_jit;
+	else
+	{
+		context = llvm_create_context(parent->state->es_jit_flags);
+		parent->state->es_jit = >base;
+	}
+
+	/*
+	 * Don't immediately emit nor actually generate the function.
+	 * instead do so the first time the expression is actually evaluated.
+	 * That allows to emit a lot of functions together, avoiding a lot of
+	 * repeated llvm and memory remapping overhead. It also helps with not
+	 * compiling functions that will never be evaluated, as can be the case
+	 * if e.g. a parallel append node is distributing workers between its
+	 * child nodes.
+	 */
+	{
+
+		CompiledExprState *cstate = palloc0(sizeof(CompiledExprState));
+
+		cstate->context = context;
+
+		state->evalfunc = ExecCompileExpr;
+		state->evalfunc_private = cstate;
+	}
+
+	llvm_leave_fatal_on_oom();
+
+	return true;
+}
+
+/*
+ * JIT compile expression.
+ */
+static Datum
+ExecCompileExpr(ExprState *state, ExprContext *econtext, bool *isNull)
+{
+	CompiledExprState *cstate = state->evalfunc_private;
+	LLVMJitContext *context = cstate->context;
+
 	LLVMBuilderRef b;
 	LLVMModuleRef mod;
 	LLVMValueRef eval_fn;
@@ -125,31 +174,16 @@ llvm_compile_expr(ExprState *state)
 
 	llvm_enter_fatal_on_oom();
 
-	/*
-	 * Right now we don't support compiling expressions without a parent, as
-	 * we need access to the EState.
-	 */
-	Assert(parent);
-
-	/* get or create JIT context */
-	if (parent->state->es_jit)
-		context = (LLVMJitContext *) parent->state->es_jit;
-	else
-	{
-		context = llvm_create_context(parent->state->es_jit_flags);
-		parent->state->es_jit = >base;
-	}
-
 	INSTR_TIME_SET_CURRENT(starttime);
 
 	mod = llvm_mutable_module(context);
 
 	b = LLVMCreateBuilder();
 
-	funcname = llvm_expand_funcname(context, "evalexpr");
+	cstate->funcname = llvm_expand_funcname(context, "evalexpr");
 
 	/* create function */
-	eval_fn = LLVMAddFunction(mod, funcname,
+	eval_fn = LLVMAddFunction(mod, cstate->funcname,
 			  llvm_pg_var_func_type("TypeExprStateEvalFunc"));
 	LLVMSetLinkage(eval_fn, LLVMExternalLinkage);
 	LLVMSetVisibility(eval_f

Re: Parallel Inserts in CREATE TABLE AS

2020-11-26 Thread Luc Vlaming

On 25-11-2020 03:40, Bharath Rupireddy wrote:

On Tue, Nov 24, 2020 at 4:43 PM Hou, Zhijie  wrote:


I'm very interested in this feature,
and I'm looking at the patch, here are some comments.



Thanks for the review.



How about the following style:

 if(TupIsNull(outerTupleSlot))
 Break;

 (void) node->ps.dest->receiveSlot(outerTupleSlot, 
node->ps.dest);
 node->ps.state->es_processed++;

Which looks cleaner.



Done.



The check can be replaced by ISCTAS(into).



Done.



'inerst' looks like a typo (insert).



Corrected.



The code here call strlen(intoclausestr) for two times,
After checking the existing code in ExecInitParallelPlan,
It used to store the strlen in a variable.

So how about the following style:

 intoclause_len = strlen(intoclausestr);
 ...
 /* Store serialized intoclause. */
 intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len + 1);
 memcpy(shmptr, intoclausestr, intoclause_len + 1);
 shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space);



Done.



The two check about intoclausestr seems can be combined like:

if (intoclausestr != NULL)
{
...
}
else
{
...
}



Done.

Attaching v5 patch. Please consider it for further review.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Disclaimer: I have by no means throughly reviewed all the involved parts 
and am probably missing quite a bit of context so if I understood parts 
wrong or they have been discussed before then I'm sorry. Most notably 
the whole situation about the command-id is still elusive for me and I 
can really not judge yet anything related to that.


IMHO The patch makes that we now have the gather do most of the CTAS 
work, which seems unwanted. For the non-ctas insert/update case it seems 
that a modifytable node exists to actually do the work. What I'm 
wondering is if it is maybe not better to introduce a CreateTable node 
as well?

This would have several merits:
- the rowcount of that node would be 0 for the parallel case, and 
non-zero for the serial case. Then the gather ndoe and the Query struct 
don't have to know about CTAS for the most part, removing e.g. the case 
distinctions in cost_gather.
- the inserted rows can now be accounted in this new node instead of the 
parallel executor state, and this node can also do its own DSM 
intializations
- the generation of a partial variants of the CreateTable node can now 
be done in the optimizer instead of the ExecCreateTableAs which IMHO is 
a more logical place to make these kind of decisions. which then also 
makes it potentially play nicer with costs and the like.
- the explain code can now be in its own place instead of part of the 
gather node
- IIUC it would allow the removal of the code to only launch parallel 
workers if its not CTAS, which IMHO would be quite a big benefit.


Thoughts?

Some small things I noticed while going through the patch:
- Typo for the comment about "inintorel_startup" which should be 
intorel_startup
-   if (node->nworkers_launched == 0 && !node->need_to_scan_locally) 


  can be changed into
  if (node->nworkers_launched == 0
  because either way it'll be true.

Regards,
Luc
Swarm64




Re: Parallel plans and "union all" subquery

2020-11-26 Thread Luc Vlaming

On 27-11-2020 04:14, Greg Nancarrow wrote:

On Thu, Nov 26, 2020 at 6:11 PM Luc Vlaming  wrote:


If interesting I can make a draft of what this would look like if this
makes it easier to discuss?



Sure, that would help clarify it.
Okay. I will try to build an example but this will take a few weeks as 
vacations and such are coming up too.




I did debug this a bit, but it seems my gut feeling was wrong, even
though it knows a type coercion is required and can be done, the
parse/analyze code doesn't actually modify the nodes in place "for
fear of changing the semantics", so when the types don't exactly match
it's all left up to the planner, but for this parse tree it fails to
produce a parallel plan.



Yes. However I think here also lies an opportunity, because to me it 
seems much more appealing to have the planner being able to deal 
correctly with all the situations rather than having things like 
flatten_simple_union_all() that provide a solution for the ideal case.



Regards,
Greg Nancarrow
Fujitsu Australia



Regards,
Luc
Swarm64




Re: Multi Inserts in CREATE TABLE AS - revived patch

2020-11-26 Thread Luc Vlaming

On 26-11-2020 14:45, Bharath Rupireddy wrote:

On Thu, Nov 26, 2020 at 5:34 PM Luc Vlaming  wrote:


On 26-11-2020 12:36, Bharath Rupireddy wrote:

Few things:

IIUC Andres mentioned similar kinds of APIs earlier in [1].

[1] -
https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx%40alap3.anarazel.de
<https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx%40alap3.anarazel.de>

I would like to add some more info to one of the API:

typedef struct MultiInsertStateData
{
  MemoryContext micontext; /* A temporary memory context for
multi insert. */
  BulkInsertStateData *bistate;   /* Bulk insert state. */
  TupleTableSlot  **mislots; /* Array of buffered slots. */
  uint32  nslots; /* Total number of buffered slots. */
  int64  nbytes; /* Flush buffers if the total tuple size
  >= nbytes. */
  int32  nused; /* Number of current buffered slots for a
multi insert batch. */
  int64  nsize; /* Total tuple size for a multi insert
batch. */
} MultiInsertStateData;

/* Creates a temporary memory context, allocates the
MultiInsertStateData, BulkInsertStateData and initializes other members. */
  void(*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);

/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it total buffer tuple size, if this size crosses
mistate->nbytes, flush the buffered tuples into table. For heapam,
existing heap_multi_insert can be used. Once the buffer is flushed, then
the micontext can be reset and buffered slots can be cleared. *If nbytes
i.e. total tuple size of the batch is not given, tuple size is not
calculated, tuples are buffered until all the nslots are filled and then
flushed.* */
  void(*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);

/* Flush the buffered tuples if any. For heapam, existing
heap_multi_insert can be used. Deletes temporary memory context and
deallocates mistate. */
  void(*end_multi_insert) (Relation rel, MultiInsertStateData
*mistate, CommandId cid, int options);


Looks all good to me, except for the nbytes part.
Could you explain to me what use case that supports? IMHO the tableam
can best decide itself that its time to flush, based on its
implementation that e.g. considers how many pages to flush at a time and
such, etc? This means also that most of the fields of
MultiInsertStateData can be private as each tableam would return a
derivative of that struct (like with the destreceivers).



nbytes is basically to support the following case, say the number of
tuples to buffer is 1000, and if all the tuples are toasted with size
in few hundred MB or even GB, then do we want to wait until 1000
tuples are buffered in which case we occupy for one query 1000*toasted
tuple size in GB. So, if we have a memory limit, then it will give
flexibility. Whether to use it or not is up to the table AM
implementation. And also that existing copy code(since it can know the
tuple size after parsing input data) uses this mechanism to decide
when to flush.

If the nbytes is not used in a table am, then the multi insert can
wait until the total tuples, how much ever large memory they occupy,
are buffered.

IMO, we can retain nbytes for now to decide on when to flush. Thoughts?


I'm very sorry I had not realized at all that the toasted data would be 
kept in memory until written out. I guess I'm not familiar enough with 
that part yet. I assumed this would be toasted beforehand and be tableam 
agnostic, and that any decision from the tableam to flush would happen 
way before a lot memory would have accumulated, which is a bit naive in 
hindsight.




I wonder, how can the do_multi_insert() API decide on when to flush, I
mean, based on the number of pages to flush? Do we need to pass the
maximum number of pages the buffered tuples can occupy and track the
pages currently buffered tuples occupy to decide when to flush? Or is
it something that the existing table AM infrastructure already
supports? If we use the number of pages to decide on when to flush,
how well it works with parallel inserts?



I was assuming each tableam to use its own logic, based on its needs and 
the tradeoffs a storage engine might want to provide. This does not mean 
it should not consider outside parameters, like the aforementioned 
memory usage.
I think it would imply that each tableam implements its own tracking 
mechanism for how much has accumulated, how, and when to flush, because 
they might track different statistics. IMHO given that each tableam 
anyway would want to implement its own logic on how to store a slot into 
a page, tracking the logic for tracking these statistics seemed minor to 
me. Maybe I missed some parts that should be extracted out to a generic 
interface however?


S

Re: Multi Inserts in CREATE TABLE AS - revived patch

2020-11-26 Thread Luc Vlaming

On 26-11-2020 12:36, Bharath Rupireddy wrote:

Few things:

IIUC Andres mentioned similar kinds of APIs earlier in [1].

[1] - 
https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx%40alap3.anarazel.de 



I would like to add some more info to one of the API:

typedef struct MultiInsertStateData
{
     MemoryContext         micontext; /* A temporary memory context for 
multi insert. */

     BulkInsertStateData *bistate;   /* Bulk insert state. */
     TupleTableSlot      **mislots; /* Array of buffered slots. */
     uint32              nslots; /* Total number of buffered slots. */
     int64              nbytes; /* Flush buffers if the total tuple size 
 >= nbytes. */
     int32              nused; /* Number of current buffered slots for a 
multi insert batch. */
     int64              nsize; /* Total tuple size for a multi insert 
batch. */

} MultiInsertStateData;

/* Creates a temporary memory context, allocates the 
MultiInsertStateData, BulkInsertStateData and initializes other members. */
     void        (*begin_multi_insert) (Relation rel, 
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);


/* Buffers the input slot into mistate slots, computes the size of the 
tuple, and adds it total buffer tuple size, if this size crosses 
mistate->nbytes, flush the buffered tuples into table. For heapam, 
existing heap_multi_insert can be used. Once the buffer is flushed, then 
the micontext can be reset and buffered slots can be cleared. *If nbytes 
i.e. total tuple size of the batch is not given, tuple size is not 
calculated, tuples are buffered until all the nslots are filled and then 
flushed.* */
     void        (*do_multi_insert) (Relation rel, MultiInsertStateData 
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);


/* Flush the buffered tuples if any. For heapam, existing 
heap_multi_insert can be used. Deletes temporary memory context and 
deallocates mistate. */
     void        (*end_multi_insert) (Relation rel, MultiInsertStateData 
*mistate, CommandId cid, int options);


With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com 


Looks all good to me, except for the nbytes part.
Could you explain to me what use case that supports? IMHO the tableam 
can best decide itself that its time to flush, based on its 
implementation that e.g. considers how many pages to flush at a time and 
such, etc? This means also that most of the fields of 
MultiInsertStateData can be private as each tableam would return a 
derivative of that struct (like with the destreceivers).


One thing I'm wondering is in which memory context the slots end up 
being allocated. I'd assume we would want to keep the slots around 
between flushes. If they are in the temporary context this might prove 
problematic however?


Regards,
Luc




Re: Parallel plans and "union all" subquery

2020-11-25 Thread Luc Vlaming

On 25-11-2020 14:54, Greg Nancarrow wrote:



On Wed, Nov 25, 2020 at 6:43 PM Luc Vlaming <mailto:l...@swarm64.com>> wrote:

 >
 >
 > You're completely right, sorry for my error. I was too quick on assuming
 > my patch would work for this specific case too; I should have tested
 > that before replying. It looked very similar but turns out to not work
 > because of the upper rel not being considered parallel.
 >
 > I would like to extend my patch to support this, or create a second
 > patch. This would however be significantly more involved because it
 > would require that we (always?) consider two paths whenever we process a
 > subquery: the best parallel plan and the best serial plan. Before I
 > emback on such a journey I would like some input on whether this would
 > be a very bad idea. Thoughts?
 >

Hi,

I must admit, your intended approach isn't what immediately came to mind 
when I saw this issue. Have you analyzed and debugged this to know 
exactly what is going on?
I haven't had time to debug this and see exactly where the code paths 
diverge for the use of "values(1)" verses "values(1::numeric)" in this 
case, but that would be one of the first steps.


What I wondered (and I may well be wrong) was how come the documented 
type resolution algorithm 
(https://www.postgresql.org/docs/13/typeconv-union-case.html 
<https://www.postgresql.org/docs/13/typeconv-union-case.html>) doesn't 
seem to be working quite right here, at least to the point of creating 
the same/similar parse tree as when I change "values(1)" to 
"values(1::numeric)" or even just "values(1.)"? So shouldn't then  the 
use of "values(1)" in this case (a constant, convertible to numeric - 
the preferred type ) result in the same (parallel) plan as when 
"values(1::numeric)" is used? Perhaps this isn't happening because the 
code is treating these as generalised expressions when their types 
aren't the same, and this then affects parsing/planning?
My natural thought was that there seems to be a minor issue in the code, 
which should be reasonably easy to fix, at least for this fairly simple 
case.


However, I claim no expertise in the area of parser/analyzer/planner, I 
only know certain areas of that code, but enough to appreciate it is 
complex and intricate, and easily broken.
Perhaps one of the major contributors to this area of the code, who 
probably know this code very well, like maybe Tom Lane or Robert Haas 
(to name two) might like to comment on whether what we're looking at is 
indeed a bug/deficiency and worth fixing, and whether Luc is correct in 
his expressed approach on what would be required to fix it?


Regards,
Greg Nancarrow
Fujitsu Australia


So from what I recall from building the patch is that the difference is 
that when all types are identical, then flatten_simple_union_all simply 
flattens all union-all operations into an append relation.
If you don't have identical types then the situation has to be handled 
by the code in prepunion.c which doesn't always keep a parallel path 
around. The patch I had posted fixes this for a relatively simple issue 
and not the case described here.
If interesting I can make a draft of what this would look like if this 
makes it easier to discuss?


Regards,
Luc
Swarm64




Re: Multi Inserts in CREATE TABLE AS - revived patch

2020-11-25 Thread Luc Vlaming

On 26-11-2020 07:31, Bharath Rupireddy wrote:

On Thu, Nov 26, 2020 at 9:55 AM Michael Paquier  wrote:


+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+   Size sz = 0;
+   HeapTuple tuple = NULL;
+
+   if (TTS_IS_HEAPTUPLE(slot))
+   tuple = ((HeapTupleTableSlot *) slot)->tuple;
+   else if(TTS_IS_BUFFERTUPLE(slot))
+   tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+   else if(TTS_IS_MINIMALTUPLE(slot))
+   tuple = ((MinimalTupleTableSlot *) slot)->tuple;

There have been various talks about the methods we could use to
evaluate the threshold in bytes when evaluating that a flush can
happen, including the use of memory contexts, or even estimate the
size of the number of tuples.  This one looks promising because it
seems exact, however for virtual slots I don't like much the fact that
you basically just extracted the parts of tts_virtual_materialize()
and stuck them in this routine.  That's a recipe for future bugs if
the materialization logic changes.  In short, I am surprised that this
calculation is not directly part of TupleTableSlotOps.  What we'd want
is to get this information depending on the slot type dealt with, and
with your patch you would miss to handle any new slot type
introduced.



Yes for virtual slots, I reused the code from
tts_virtual_materialize() in GetTupleSize(). I can think of below
options:

1) Make the size calculation code for virtual slots, a macro or a
static inline function and use that in tts_virtual_materialize() and
GetTupleSize().
2) Add comments in both the places, such as "if any code is changed
here, consider changing it in tts_virtual_materialize() /
GetTupleSize()"
3) Add a size variable to TupleTableSlotOps structure.
4) Add a new API to TupleTableSlotOps structure say get_slot_size().
5) For new slot types, maybe we can have comments in tuptable.h to
consider having equivalent change in GetTupleSize().

If we go with 3 and 4, will it be acceptable to add the extra code in
generic structure which gets used in most of the code base and use
that new code only in limited places (for multi inserts in CTAS and
Refresh Mat View)? I think we can go ahead with 2 and 5. Thoughts?

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



What I'm wondering about is the reason for wanting a cap on data volume. 
When doing some local (highly concurrent) ingest speed tests a few weeks 
ago it seemed to mostly matter how many pages were being written and the 
resulting pressure on locks, etc. and not necessarily so much the actual 
memory usage. I didn't collect proof on that though (yet). There was 
however a very clearly observable contention point where with bigger 
buffers the performance would not only stagnate but actually drop.


So what I'm kinda wondering is if we should worry more about the amount 
of pages that are going to be written and maybe not so much about the 
memory usage?


If this were to be the case then maybe we can consider improving the 
current design, potentially in a follow-up patch? The problem I see is 
that generically each tableam will have different choices to make on how 
to buffer and flush multiple rows, given that a storage engine might 
have more or less write amplification, a different way of extending a 
relation, fsm use, etc.

Assuming we indeed want a per-tableam implementation, we could either:
- make multi_insert buffer the tuples itself and add a flush_multi_insert.
- add a new function called create_multi_insert which returns something 
like a MultiInsertState, which, like a destreceiver, has a set of 
callbacks to start, shutdown and insert.


With both solutions one part that to me seems appealing is that we 
buffer the data in something that likely resembles the disk format very 
much. Thoughts?


Regards,
Luc
Swarm64




Re: Multi Inserts in CREATE TABLE AS - revived patch

2020-11-25 Thread Luc Vlaming

On 23-11-2020 11:23, Bharath Rupireddy wrote:

On Mon, Nov 23, 2020 at 3:26 PM Heikki Linnakangas  wrote:


On 23/11/2020 11:15, Bharath Rupireddy wrote:

Attaching v2 patch, rebased on the latest master 17958972.


I just broke this again with commit c532d15ddd to split up copy.c.
Here's another rebased version.



Thanks! I noticed that and am about to post a new patch. Anyways,
thanks for the rebased v3 patch. Attaching here v3 again for
visibility.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Hi,

Thanks for reviving the patch! I did unfortunately have to shift my 
priorities somewhat and did not find much time to work on open source 
things the last week(s).


I'm wondering about the use of the GetTupleSize function. As far as I 
understand the idea is to limit the amount of buffered data, presumably 
to not write too much data at once for intorel_flush_multi_insert.
If I understood correctly how it all works, the table slot can however 
be of different type than the source slot, which makes that the call to 
CopySlot() potentially stores a different amount of data than computed 
by GetTupleSize(). Not sure if this is a big problem as an estimation 
might be good enough?


Some other solutions/implementations would be:
- compute the size after doing CopySlot. Maybe the relation never wants 
a virtual tuple and then you can also simplify GetTupleSize?
- after CopySlot ask for the memory consumed in the slot using 
MemoryContextMemAllocated.


Some small things to maybe change are:
===
+   if (myState->mi_slots[myState->mi_slots_num] == NULL)
+   {
+   batchslot = table_slot_create(myState->rel, NULL);
+   myState->mi_slots[myState->mi_slots_num] = batchslot;
+   }
+   else
+   batchslot = myState->mi_slots[myState->mi_slots_num];

Alternative:
+   if (myState->mi_slots[myState->mi_slots_num] == NULL)
+			myState->mi_slots[myState->mi_slots_num] = 
table_slot_create(myState->rel, NULL);

+   batchslot = myState->mi_slots[myState->mi_slots_num];

==

+   sz = att_align_nominal(sz, att->attalign);
This could be moved out of the if statement?

==

Regards,
Luc
Swarm64




Re: Parallel plans and "union all" subquery

2020-11-24 Thread Luc Vlaming

On 24-11-2020 01:44, Greg Nancarrow wrote:

On Tue, Nov 24, 2020 at 2:34 AM Luc Vlaming  wrote:


Hi,

For this problem there is a patch I created, which is registered under
https://commitfest.postgresql.org/30/2787/ that should fix this without
any workarounds. Maybe someone can take a look at it?



I tried your patch with the latest PG source code (24/11), but
unfortunately a non-parallel plan was still produced in this case.

test=# explain
select count(*)
from (select
n1
from drop_me
union all
values(1)) ua;
QUERY PLAN

  Aggregate  (cost=1889383.54..1889383.55 rows=1 width=8)
->  Append  (cost=0.00..1362834.03 rows=42123961 width=32)
  ->  Seq Scan on drop_me  (cost=0.00..730974.60 rows=42123960 width=32)
  ->  Subquery Scan on "*SELECT* 2"  (cost=0.00..0.02 rows=1 width=32)
->  Result  (cost=0.00..0.01 rows=1 width=4)
(5 rows)


That's not to say your patch doesn't have merit - but maybe just not a
fix for this particular case.

As before, if the SQL is tweaked to align the types for the UNION, you
get a parallel plan:

test=# explain
select count(*)
from (select
n1
from drop_me
union all
values(1::numeric)) ua;
  QUERY PLAN

  Finalize Aggregate  (cost=821152.71..821152.72 rows=1 width=8)
->  Gather  (cost=821152.50..821152.71 rows=2 width=8)
  Workers Planned: 2
  ->  Partial Aggregate  (cost=820152.50..820152.51 rows=1 width=8)
->  Parallel Append  (cost=0.00..747235.71 rows=29166714 
width=0)
  ->  Result  (cost=0.00..0.01 rows=1 width=0)
  ->  Parallel Seq Scan on drop_me
(cost=0.00..601402.13 rows=29166713 width=0)
(7 rows)


Regards,
Greg Nancarrow
Fujitsu Australia



Hi,

You're completely right, sorry for my error. I was too quick on assuming 
my patch would work for this specific case too; I should have tested 
that before replying. It looked very similar but turns out to not work 
because of the upper rel not being considered parallel.


I would like to extend my patch to support this, or create a second 
patch. This would however be significantly more involved because it 
would require that we (always?) consider two paths whenever we process a 
subquery: the best parallel plan and the best serial plan. Before I 
emback on such a journey I would like some input on whether this would 
be a very bad idea. Thoughts?


Regards,
Luc
Swarm64




Re: Parallel plans and "union all" subquery

2020-11-23 Thread Luc Vlaming

On 23-11-2020 13:17, Phil Florent wrote:

Hi Greg,

The implicit conversion was the cause of the non parallel plan, thanks 
for the explanation and the workarounds. It can cause a huge difference 
in terms of performance, I will give the information to our developers.


Regards,

Phil




*De :* Greg Nancarrow 
*Envoyé :* lundi 23 novembre 2020 06:04
*À :* Phil Florent 
*Cc :* pgsql-hackers@lists.postgresql.org 


*Objet :* Re: Parallel plans and "union all" subquery
On Sun, Nov 22, 2020 at 11:51 PM Phil Florent  
wrote:



Hi,


I have a question about parallel plans. I also posted it on the general list 
but perhaps it's a question for hackers. Here is my test case :


explain
select count(*)
from (select
n1
from drop_me
union all
values(1)) ua;


QUERY PLAN

Aggregate (cost=2934739.24..2934739.25 rows=1 width=8)
-> Append (cost=0.00..2059737.83 rows=7113 width=32)
-> Seq Scan on drop_me (cost=0.00..1009736.12 rows=7112 width=6)
-> Subquery Scan on "*SELECT* 2" (cost=0.00..0.02 rows=1 width=32)
-> Result (cost=0.00..0.01 rows=1 width=4)
JIT:
Functions: 4
Options: Inlining true, Optimization true, Expressions true, Deforming true


No parallel plan, 2s6


I read the documentation but I don't get the reason of the "noparallel" seq 
scan of drop_me in the last case ?



Without debugging this, it looks to me that the UNION type resolution
isn't working as well as it possibly could in this case, for the
generation of a parallel plan. I found that with a minor tweak to your
SQL, either for the table creation or query, it will produce a
parallel plan.

Noting that currently you're creating the drop_me table with a
"numeric" column, you can either:

(1) Change the table creation

FROM:
     create unlogged table drop_me as select generate_series(1,7e7) n1;
TO:
     create unlogged table drop_me as select generate_series(1,7e7)::int n1;


OR


(2) Change the query

FROM:
     explain
     select count(*)
     from (select
     n1
     from drop_me
     union all
     values(1)) ua;

TO:

     explain
     select count(*)
     from (select
     n1
     from drop_me
     union all
     values(1::numeric)) ua;


     QUERY PLAN

  Finalize Aggregate  (cost=821152.71..821152.72 rows=1 width=8)
    ->  Gather  (cost=821152.50..821152.71 rows=2 width=8)
  Workers Planned: 2
  ->  Partial Aggregate  (cost=820152.50..820152.51 rows=1 width=8)
    ->  Parallel Append  (cost=0.00..747235.71 rows=29166714 
width=0)

  ->  Result  (cost=0.00..0.01 rows=1 width=0)
  ->  Parallel Seq Scan on drop_me
(cost=0.00..601402.13 rows=29166713 width=0)
(7 rows)


Regards,
Greg Nancarrow
Fujitsu Australia


Hi,

For this problem there is a patch I created, which is registered under 
https://commitfest.postgresql.org/30/2787/ that should fix this without 
any workarounds. Maybe someone can take a look at it?


Regards,
Luc
Swarm64




Re: should INSERT SELECT use a BulkInsertState?

2020-11-01 Thread Luc Vlaming

On 30.10.20 05:51, Justin Pryzby wrote:

On Thu, Oct 22, 2020 at 01:29:53PM +0100, Simon Riggs wrote:

On Fri, 16 Oct 2020 at 22:05, Justin Pryzby  wrote:


I made this conditional on BEGIN BULK/SET bulk, so I'll solicit comments on 
that.


I think it would be better if this was self-tuning. So that we don't
allocate a bulkinsert state until we've done say 100 (?) rows
inserted.


I made it an optional, non-default behavior in response to the legitimate
concern for performance regression for the cases where a loader needs to be as
fast as possible - as compared with our case, where we want instead to optimize
for our reports by making the loaders responsible for their own writes, rather
than leaving behind many dirty pages, and clobbering the cache, too.

Also, INSERT SELECT doesn't immediately help us (telsasoft), since we use
INSERT .. VALUES () .. ON CONFLICT.  This would handle that case, which is
great, even though that wasn't a design goal.  It could also be an integer GUC
to allow configuring the size of the ring buffer.


You should also use table_multi_insert() since that will give further
performance gains by reducing block access overheads. Switching from
single row to multi-row should also only happen once we've loaded a
few rows, so we don't introduce overahads for smaller SQL statements.


Good idea...multi_insert (which reduces the overhead of individual inserts) is
mostly independent from BulkInsert state (which uses a ring-buffer to avoid
dirtying the cache).  I made this 0002.

This makes INSERT SELECT several times faster, and not clobber the cache too.

Time: 4700.606 ms (00:04.701)
123 |  1
 37 |  2
 20 |  3
 11 |  4
   4537 |  5
  11656 |

Time: 1125.302 ms (00:01.125)
   2171 |  1
 37 |  2
 20 |  3
 11 |  4
111 |  5
  14034 |

When enabled, this passes nearly all regression tests, and all but 2 of the
changes are easily understood.  The 2nd patch still needs work.



Hi,

Came across this thread because I'm working on an improvement for the 
relation extension to improve the speed of the bulkinsert itself in 
(highly) parallel cases and would like to make sure that our approaches 
work nicely together.


Given what I've seen and tried so far with various benchmarks I would 
also really like to see a different approach here. The "BEGIN BULK" can 
be problematic for example if you mix small amounts of inserts and big 
amounts in the same transaction, or if your application possibly does a 
bulk insert but otherwise mostly OLTP transactions.


To me the idea from Simon sounds good to only use a bulk insert state 
after inserting e.g. a 1000 rows, and this also seems more applicable to 
most applications compared to requiring a change to any application that 
wishes to have faster ingest.


Another approach could be to combine this, for example, with a few extra 
requirements to limit the amount of regressions and first learn more how 
this behaves in the field.

We could, for example, only (just throwing out some ideas), require that:
- the relation has a certain size
- a BufferStrategy a maximum certain size is used
- there is a certain amount of lock waiters on relation extension. (like 
we do with bulk extend)
- we have extended the relation for at least e.g. 4 MB and not used the 
FSM anymore thereby proving that we are doing bulk operations instead of 
random small extensions everywhere into the relation that use the FSM.


Another thing is that we first try to improve the bulk operation 
facilities in general and then have another shot at this? Not sure if 
there is some benchmark / query that shows where such a 10x slowdown 
would appear but maybe that would be worth a look as well possibly.


Regards,
Luc




Re: allow partial union-all and improve parallel subquery costing

2020-10-22 Thread Luc Vlaming

On 14.10.20 09:38, Luc Vlaming wrote:

Hi,

It seems I ran the wrong make checks to verify everything is correct (make 
check instead
of make installcheck-world) and this uncovered another regress test change. I 
also noticed
the statistics are sometimes giving different row count results so I increased 
the row
statistics target to make sure the regress output is stable. Updated patch 
attached which
now successfully runs installcheck-world for v13 and master.

Kind regards,
Luc


From: Luc Vlaming 
Sent: Tuesday, October 13, 2020 10:57 AM
To: pgsql-hackers
Subject: allow partial union-all and improve parallel subquery costing

Hi,

While developing some improvements for TPC-DS queries I found out that with
UNION ALL partial paths are not emitted. Whilst fixing that I also came across
the subquery costing which does not seem to consider parallelism when doing
the costing.

I added a simplified testcase in pg-regress to show this goes wrong, and
attached also a before and after explain output of tpc-ds SF100 query 5
based on version 12.4.

I hope I followed all etiquette and these kind of improvements are welcome.

Kind regards,
Luc
Swarm64



Hi,

Created a commitfest entry assuming this is the right thing to do so 
that someone can potentially pick it up during the commitfest.


Kind regards,
Luc
Swarm64




Re: Parallel Inserts in CREATE TABLE AS

2020-10-16 Thread Luc Vlaming

On 16.10.20 08:23, Bharath Rupireddy wrote:

On Fri, Oct 16, 2020 at 11:33 AM Luc Vlaming  wrote:


Really looking forward to this ending up in postgres as I think it's a
very nice improvement.

Whilst reviewing your patch I was wondering: is there a reason you did
not introduce a batch insert in the destreceiver for the CTAS? For me
this makes a huge difference in ingest speed as otherwise the inserts do
not really scale so well as lock contention start to be a big problem.
If you like I can make a patch to introduce this on top?



Thanks for your interest. You are right, we can get maximum
improvement if we have multi inserts in destreceiver for the CTAS on
the similar lines to COPY FROM command. I specified this point in my
first mail [1]. You may want to take a look at an already existing
patch [2] for multi inserts, I think there are some review comments to
be addressed in that patch. I would love to see the multi insert patch
getting revived.

[1] - 
https://www.postgresql.org/message-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf%2B0-Jg%2BKYT7ZO-Ug%40mail.gmail.com
[2] - 
https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Sorry had not seen that pointer in your first email.

I'll first finish some other patches I'm working on and then I'll try to 
revive that patch. Thanks for the pointers.


Kind regards,
Luc
Swarm64




Re: Parallel Inserts in CREATE TABLE AS

2020-10-16 Thread Luc Vlaming

On 14.10.20 11:16, Bharath Rupireddy wrote:

On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila  wrote:



Yes we do a bunch of catalog changes related to the created new table.
We will have both the txn id and command id assigned when catalogue
changes are being made. But, right after the table is created in the
leader, the command id is incremented (CommandCounterIncrement() is
called from create_ctas_internal()) whereas the txn id remains the
same. The new command id is marked as GetCurrentCommandId(true); in
intorel_startup, then the parallel mode is entered. The txn id and
command id are serialized into parallel DSM, they are then available
to all parallel workers. This is discussed in [1].

Few changes I have to make in the parallel worker code: set
currentCommandIdUsed = true;, may be via a common API
SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the
extra command id sharing from the leader to workers.

I will add a few comments in the upcoming patch related to the above info.



Yes, that would be good.



Added comments.




But how does that work for SELECT INTO? Are you prohibiting
that? ...



In case of SELECT INTO, a new table gets created and I'm not
prohibiting the parallel inserts and I think we don't need to.



So, in this case, also do we ensure that table is created before we
launch the workers. If so, I think you can explain in comments about
it and what you need to do that to ensure the same.



For SELECT INTO, the table gets created by the leader in
create_ctas_internal(), then ExecInitParallelPlan() gets called which
launches the workers and then the leader(if asked to do so) and the
workers insert the rows. So, we don't need to do any extra work to
ensure the table gets created before the workers start inserting
tuples.



While skimming through the patch, a small thing I noticed:
+ /*
+ * SELECT part of the CTAS is parallelizable, so we can make
+ * each parallel worker insert the tuples that are resulted
+ * in it's execution into the target table.
+ */
+ if (!is_matview &&
+ IsA(plan->planTree, Gather))
+ ((DR_intorel *) dest)->is_parallel = true;
+

I am not sure at this stage if this is the best way to make CTAS as
parallel but if so, then probably you can expand the comments a bit to
say why you consider only Gather node (and that too when it is the
top-most node) and why not another parallel node like GatherMerge?



If somebody expects to preserve the order of the tuples that are
coming from GatherMerge node of the select part in CTAS or SELECT INTO
while inserting, now if parallelism is allowed, that may not be the
case i.e. the order of insertion of tuples may vary. I'm not quite
sure, if someone wants to use order by in the select parts of CTAS or
SELECT INTO in a real world use case. Thoughts?



Right, for now, I think you can simply remove that check from the code
instead of just commenting it. We will see if there is a better
check/Assert we can add there.



Done.

I also worked on some of the open points I listed earlier in my mail.



3. Need to restrict parallel inserts, if CTAS tries to create temp/global 
tables as the workers will not have access to those tables.



Done.



Need to analyze whether to allow parallelism if CTAS has prepared statements or 
with no data.



For prepared statements, the parallelism will not be picked and so is
parallel insertion.
For CTAS with no data option case the select part is not even planned,
and so the parallelism will also not be picked.



4. Need to stop unnecessary parallel shared state such as tuple queue being 
created and shared to workers.



Done.

I'm listing the things that are still pending.

1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.
2. Addition of new test cases. Testing with more scenarios and
different data sets, sizes, tablespaces, select into. Analysis on the
2 mismatches in write_parallel.sql regression test.

Attaching v2 patch, thoughts and comments are welcome.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com



Hi,

Really looking forward to this ending up in postgres as I think it's a 
very nice improvement.


Whilst reviewing your patch I was wondering: is there a reason you did 
not introduce a batch insert in the destreceiver for the CTAS? For me 
this makes a huge difference in ingest speed as otherwise the inserts do 
not really scale so well as lock contention start to be a big problem. 
If you like I can make a patch to introduce this on top?


Kind regards,
Luc
Swarm64




Re: allow partial union-all and improve parallel subquery costing

2020-10-14 Thread Luc Vlaming
Hi,

It seems I ran the wrong make checks to verify everything is correct (make 
check instead
of make installcheck-world) and this uncovered another regress test change. I 
also noticed
the statistics are sometimes giving different row count results so I increased 
the row
statistics target to make sure the regress output is stable. Updated patch 
attached which
now successfully runs installcheck-world for v13 and master.

Kind regards,
Luc


From: Luc Vlaming 
Sent: Tuesday, October 13, 2020 10:57 AM
To: pgsql-hackers
Subject: allow partial union-all and improve parallel subquery costing

Hi,

While developing some improvements for TPC-DS queries I found out that with
UNION ALL partial paths are not emitted. Whilst fixing that I also came across
the subquery costing which does not seem to consider parallelism when doing
the costing.

I added a simplified testcase in pg-regress to show this goes wrong, and
attached also a before and after explain output of tpc-ds SF100 query 5
based on version 12.4.

I hope I followed all etiquette and these kind of improvements are welcome.

Kind regards,
Luc
Swarm64
From 622903bff2108cb15d8fcf92b327f5ef00d41daa Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Wed, 14 Oct 2020 09:22:46 +0200
Subject: [PATCH v2] Allow partial UNION ALL; improve parallel subquery costing

---
 src/backend/optimizer/path/costsize.c | 11 
 src/backend/optimizer/prep/prepunion.c|  4 ++
 .../regress/expected/incremental_sort.out | 10 ++--
 src/test/regress/expected/union.out   | 52 +++
 src/test/regress/sql/union.sql| 37 +
 5 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index f39e6a9f80..c2018b3401 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1318,6 +1318,17 @@ cost_subqueryscan(SubqueryScanPath *path, PlannerInfo *root,
 	startup_cost += path->path.pathtarget->cost.startup;
 	run_cost += path->path.pathtarget->cost.per_tuple * path->path.rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->path.parallel_workers > 0)
+	{
+		double		parallel_divisor = get_parallel_divisor(>path);
+
+		path->path.rows = clamp_row_est(path->path.rows / parallel_divisor);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+	}
+
 	path->path.startup_cost += startup_cost;
 	path->path.total_cost += startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 745f443e5c..4001eb87f3 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -678,6 +678,10 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 			   NIL, NULL,
 			   parallel_workers, enable_parallel_append,
 			   NIL, -1);
+
+		if (op->all && enable_parallel_append)
+			add_partial_path(result_rel, ppath);
+
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 			   result_rel->reltarget, NULL, NULL);
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index e376ea1276..c9596b8e12 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1459,13 +1459,11 @@ explain (costs off) select * from t union select * from t order by 1,3;
->  Unique
  ->  Sort
Sort Key: t.a, t.b, t.c
-   ->  Append
- ->  Gather
-   Workers Planned: 2
+   ->  Gather
+ Workers Planned: 2
+ ->  Parallel Append
->  Parallel Seq Scan on t
- ->  Gather
-   Workers Planned: 2
->  Parallel Seq Scan on t t_1
-(13 rows)
+(11 rows)
 
 drop table t;
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 6e72e92d80..cd553c9c0c 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1052,3 +1052,55 @@ where (x = 0) or (q1 >= q2 and q1 <= q2);
  4567890123456789 |  4567890123456789 | 1
 (6 rows)
 
+-- Test handling of appendrel with different types which disables the path flattening and
+-- forces a subquery node. for the subquery node ensure the rowcounts are correct.
+create function check_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+ln text;
+tmp text[];
+first_row bool := true;
+begin
+for ln in
+execute format('explain %s', $1)
+loop
+tmp := regexp_match(ln, 'rows=(\d*)');
+return query select tmp[1]::int;
+end loop;
+

Re: Use list_delete_xxxcell O(1) instead of list_delete_ptr O(N) in some places

2020-10-14 Thread Luc Vlaming
The following review has been posted through the commitfest application:
make installcheck-world:  tested, passed
Implements feature:   not tested
Spec compliant:   not tested
Documentation:not tested

Patch applies cleanly on master & 13 and installcheck-world runs on 13 & 
master. Seem to follow the new style of using more the expressive macro's for 
the list interface, so looks good to me.

The new status of this patch is: Ready for Committer


allow partial union-all and improve parallel subquery costing

2020-10-13 Thread Luc Vlaming
Hi,

While developing some improvements for TPC-DS queries I found out that with
UNION ALL partial paths are not emitted. Whilst fixing that I also came across
the subquery costing which does not seem to consider parallelism when doing
the costing.

I added a simplified testcase in pg-regress to show this goes wrong, and
attached also a before and after explain output of tpc-ds SF100 query 5
based on version 12.4.

I hope I followed all etiquette and these kind of improvements are welcome.

Kind regards,
Luc
Swarm64
From 651999e07735f7dca887b4b672a008620515e857 Mon Sep 17 00:00:00 2001
From: Luc Vlaming 
Date: Tue, 13 Oct 2020 09:35:33 +0200
Subject: [PATCH v1] Allow partial UNION ALL; improve parallel subquery costing

---
 src/backend/optimizer/path/costsize.c  | 11 ++
 src/backend/optimizer/prep/prepunion.c |  4 +++
 src/test/regress/expected/union.out| 50 ++
 src/test/regress/sql/union.sql | 35 ++
 4 files changed, 100 insertions(+)

diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index a2a9b1f7be..b05efb8bd5 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1316,6 +1316,17 @@ cost_subqueryscan(SubqueryScanPath *path, PlannerInfo *root,
 	startup_cost += path->path.pathtarget->cost.startup;
 	run_cost += path->path.pathtarget->cost.per_tuple * path->path.rows;
 
+	/* Adjust costing for parallelism, if used. */
+	if (path->path.parallel_workers > 0)
+	{
+		double		parallel_divisor = get_parallel_divisor(>path);
+
+		path->path.rows = clamp_row_est(path->path.rows / parallel_divisor);
+
+		/* The CPU cost is divided among all the workers. */
+		run_cost /= parallel_divisor;
+	}
+
 	path->path.startup_cost += startup_cost;
 	path->path.total_cost += startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index cd9d49c1f7..99da4297e6 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -675,6 +675,10 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root,
 			   NIL, NULL,
 			   parallel_workers, enable_parallel_append,
 			   NIL, -1);
+
+		if (op->all && enable_parallel_append)
+			add_partial_path(result_rel, ppath);
+
 		ppath = (Path *)
 			create_gather_path(root, result_rel, ppath,
 			   result_rel->reltarget, NULL, NULL);
diff --git a/src/test/regress/expected/union.out b/src/test/regress/expected/union.out
index 7189f5bd3d..77a7e5e759 100644
--- a/src/test/regress/expected/union.out
+++ b/src/test/regress/expected/union.out
@@ -1052,3 +1052,53 @@ where (x = 0) or (q1 >= q2 and q1 <= q2);
  4567890123456789 |  4567890123456789 | 1
 (6 rows)
 
+-- Test handling of appendrel with different types which disables the path flattening and
+-- forces a subquery node. for the subquery node ensure the rowcounts are correct.
+create function check_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+ln text;
+tmp text[];
+first_row bool := true;
+begin
+for ln in
+execute format('explain %s', $1)
+loop
+tmp := regexp_match(ln, 'rows=(\d*)');
+return query select tmp[1]::int;
+end loop;
+end;
+$$;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+explain (costs off)
+select *, 0::int from tenk1 a
+union all
+select *, 1::bigint from tenk1 b;
+   QUERY PLAN   
+
+ Gather
+   Workers Planned: 2
+   ->  Parallel Append
+ ->  Subquery Scan on "*SELECT* 1"
+   ->  Parallel Seq Scan on tenk1 a
+ ->  Parallel Seq Scan on tenk1 b
+(6 rows)
+
+select check_estimated_rows('select *, 0::int from tenk1 a union all select *, 1::bigint from tenk1 b;');
+ check_estimated_rows 
+--
+19990
+ 
+ 8330
+ 4165
+ 4165
+ 4165
+(6 rows)
+
+reset parallel_setup_cost;
+reset parallel_tuple_cost;
+reset min_parallel_table_scan_size;
+drop function check_estimated_rows(text);
diff --git a/src/test/regress/sql/union.sql b/src/test/regress/sql/union.sql
index 5f4881d594..93a2c7b329 100644
--- a/src/test/regress/sql/union.sql
+++ b/src/test/regress/sql/union.sql
@@ -440,3 +440,38 @@ select * from
union all
select *, 1 as x from int8_tbl b) ss
 where (x = 0) or (q1 >= q2 and q1 <= q2);
+
+-- Test handling of appendrel with different types which disables the path flattening and
+-- forces a subquery node. for the subquery node ensure the rowcounts are correct.
+create function check_estimated_rows(text) returns table (estimated int)
+language plpgsql as
+$$
+declare
+ln text;
+tmp text[];
+first_row boo