Hi,

Here's a review of v24

+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+delete from bigger_than_it_looks where id <= 19000;
+vacuum bigger_than_it_looks;
+analyze bigger_than_it_looks;
+insert into bigger_than_it_looks
+  select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

It seems kinda easier to just manipulate ndistinct and reltuples...


+set max_parallel_workers_per_gather = 0;
+set work_mem = '4MB';

I hope there's a fair amount of slop here - with different archs you're
going to see quite some size differences.

+-- The "good" case: batches required, but we plan the right number; we
+-- plan for 16 batches, and we stick to that number, and peak memory
+-- usage says within our work_mem budget
+-- non-parallel
+set max_parallel_workers_per_gather = 0;
+set work_mem = '128kB';

So how do we know that's actually the case we're testing rather than
something arbitrarily different? There's IIRC tests somewhere that just
filter the json explain output to the right parts...





+/*
+ * Build the name for a given segment of a given BufFile.
+ */
+static void
+MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
+{
+       snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
+}

Not a fan of this name - you're not "making" a filename here (as in
allocating or such). I think I'd just remove the Make prefix.



+/*
+ * Open a file that was previously created in another backend with
+ * BufFileCreateShared in the same SharedFileSet using the same name.  The
+ * backend that created the file must have called BufFileClose() or
+ * BufFileExport() to make sure that it is ready to be opened by other
+ * backends and render it read-only.
+ */

Is it actually guaranteed that it's another backend / do we rely on
that?

+BufFile *
+BufFileOpenShared(SharedFileSet *fileset, const char *name)
+{

+       /*
+        * If we didn't find any files at all, then no BufFile exists with this
+        * tag.
+        */
+       if (nfiles == 0)
+               return NULL;

s/taag/name/?


+/*
+ * Delete a BufFile that was created by BufFileCreateShared in the given
+ * SharedFileSet using the given name.
+ *
+ * It is not necessary to delete files explicitly with this function.  It is
+ * provided only as a way to delete files proactively, rather than waiting for
+ * the SharedFileSet to be cleaned up.
+ *
+ * Only one backend should attempt to delete a given name, and should know
+ * that it exists and has been exported or closed.
+ */
+void
+BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+{
+       char            segment_name[MAXPGPATH];
+       int                     segment = 0;
+       bool            found = false;
+
+       /*
+        * We don't know how many segments the file has.  We'll keep deleting
+        * until we run out.  If we don't manage to find even an initial 
segment,
+        * raise an error.
+        */
+       for (;;)
+       {
+               MakeSharedSegmentName(segment_name, name, segment);
+               if (!SharedFileSetDelete(fileset, segment_name, true))
+                       break;
+               found = true;
+               ++segment;
+       }

Hm. Do we properly delete all the files via the resowner mechanism if
this fails midway? I.e. if there are no leading segments? Also wonder if
this doesn't need a CFI check.

+void
+PathNameCreateTemporaryDir(const char *basedir, const char *directory)
+{
+       if (mkdir(directory, S_IRWXU) < 0)
+       {
+               if (errno == EEXIST)
+                       return;
+
+               /*
+                * Failed.  Try to create basedir first in case it's missing. 
Tolerate
+                * ENOENT to close a race against another process following the 
same
+                * algorithm.
+                */
+               if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
+                       elog(ERROR, "cannot create temporary directory \"%s\": 
%m",
+                                basedir);

ENOENT or EEXIST?



+File
+PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
+{
+       File            file;
+
+       /*
+        * Open the file.  Note: we don't use O_EXCL, in case there is an 
orphaned
+        * temp file that can be reused.
+        */
+       file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
+       if (file <= 0)
+       {
+               if (error_on_failure)
+                       elog(ERROR, "could not create temporary file \"%s\": 
%m", path);
+               else
+                       return file;
+       }
+
+       /* Mark it for temp_file_limit accounting. */
+       VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
+
+       /*
+        * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we 
still
+        * want to make sure they get closed at end of xact.
+        */
+       ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+       ResourceOwnerRememberFile(CurrentResourceOwner, file);
+       VfdCache[file].resowner = CurrentResourceOwner;

So maybe I'm being pedantic here, but wouldn't the right order be to do
ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
allocating operation, so it can fail, which'd leak the file.

+/*
+ * Open a file that was created with PathNameCreateTemporaryFile, possibly in
+ * another backend.  Files opened this way don't count agains the

s/agains/against/

+ * temp_file_limit of the caller, are read-only and are automatically closed
+ * at the end of the transaction but are not deleted on close.
+ */
+File
+PathNameOpenTemporaryFile(const char *path)
+{
+       File            file;
+
+       /* We open the file read-only. */
+       file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+       /* If no such file, then we don't raise an error. */
+       if (file <= 0 && errno != ENOENT)
+               elog(ERROR, "could not open temporary file \"%s\": %m", path);
+
+       if (file > 0)
+       {
+               /*
+                * We don't set FD_DELETE_AT_CLOSE for files opened this way, 
but we
+                * still want to make sure they get closed at end of xact.
+                */
+               ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+               ResourceOwnerRememberFile(CurrentResourceOwner, file);
+               VfdCache[file].resowner = CurrentResourceOwner;

Same complaint as above, ResourceOwnerEnlargeFiles() should be done
earlier.


+/*
+ * Delete a file by pathname.  Return true if the file existed, false if
+ * didn't.
+ */
+bool
+PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
+{
+       struct stat filestats;
+       int                     stat_errno;
+
+       /* Get the final size for pgstat reporting. */
+       if (stat(path, &filestats) != 0)
+               stat_errno = errno;
+       else
+               stat_errno = 0;
+
+       /*
+        * Unlike FileClose's automatic file deletion code, we tolerate
+        * non-existence to support BufFileDeleteShared which doesn't know how
+        * many segments it has to delete until it runs out.
+        */
+       if (stat_errno == ENOENT)
+               return false;
+
+       if (unlink(path) < 0)
+       {
+               if (errno != ENOENT)
+                       elog(error_on_failure ? ERROR : LOG,
+                                "cannot unlink temporary file \"%s\": %m", 
path);
+               return false;
+       }
+
+       if (stat_errno == 0)
+               ReportTemporaryFileUsage(path, filestats.st_size);
+       else
+       {
+               errno = stat_errno;
+               elog(LOG, "could not stat file \"%s\": %m", path);
+       }

All these messages are "not expected to ever happen" ones, right?

+       return true;
+}
+
 /*
  * close a file when done with it
  */
@@ -1537,10 +1747,17 @@ FileClose(File file)
                Delete(file);
        }
 
+       if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
+       {
+               /* Subtract its size from current usage (do first in case of 
error) */
+               temporary_files_size -= vfdP->fileSize;
+               vfdP->fileSize = 0;
+       }

So, is it right to do so unconditionally and without regard for errors?
If the file isn't deleted, it shouldn't be subtracted from fileSize. I
guess you're managing that through the flag, but that's not entirely
obvious.

diff --git a/src/backend/storage/file/sharedfileset.c 
b/src/backend/storage/file/sharedfileset.c
new file mode 100644
index 00000000000..6da80838b37
--- /dev/null
+++ b/src/backend/storage/file/sharedfileset.c
@@ -0,0 +1,240 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedfileset.c
+ *       Shared temporary file management.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/storage/file/sharedfileset.c
+ *
+ *-------------------------------------------------------------------------
+ */

A slightly bigger comment wouldn't hurt.



+/*
+ * Attach to a set of directories that was created with SharedFileSetInit.
+ */
+void
+SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
+{
+       bool            success;
+
+       SpinLockAcquire(&fileset->mutex);
+       if (fileset->refcnt == 0)
+               success = false;

I've not read finished reading through this, but is this safe? If the
segment's gone, is the spinlock guaranteed to still be a spinlock?  I
suspect this isn't a problem because just the underlying data is
removed, but the SharedFileSet stays alive?

+static void
+GetSharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
+{
+       char            tempdirpath[MAXPGPATH];
+       
+       GetTempTablespacePath(tempdirpath, tablespace);
+       snprintf(path, MAXPGPATH, "%s/%s%d.%d.sharedfileset" 
PG_TEMP_SUBDIR_SUFFIX,
+                        tempdirpath, PG_TEMP_FILE_PREFIX,
+                        fileset->creator_pid, fileset->number);
+}

+/*
+ * Sorting hat to determine which tablespace a given shared temporary file
+ * belongs in.
+ */
+static Oid
+ChooseTablespace(const SharedFileSet *fileset, const char *name)
+{
+       uint32          hash = hash_any((const unsigned char *) name, 
strlen(name));
+
+       return fileset->tablespaces[hash % fileset->ntablespaces];
+}

Hm. I wonder if just round-robin through these isn't a better approach.


+/*
+ * Compute the full path of a file in a SharedFileSet.
+ */
+static void
+GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
+{
+       char            dirpath[MAXPGPATH];
+
+       GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+       snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, 
name);
+}
diff --git a/src/backend/utils/resowner/resowner.c 
b/src/backend/utils/resowner/resowner.c
index 4c35ccf65eb..8b91d5a6ebe 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                                PrintRelCacheLeakWarning(res);
                        RelationClose(res);
                }
-
-               /* Ditto for dynamic shared memory segments */
-               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
-               {
-                       dsm_segment *res = (dsm_segment *) 
DatumGetPointer(foundres);
-
-                       if (isCommit)
-                               PrintDSMLeakWarning(res);
-                       dsm_detach(res);
-               }
        }
        else if (phase == RESOURCE_RELEASE_LOCKS)
        {
@@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
                                PrintFileLeakWarning(res);
                        FileClose(res);
                }
+
+               /* Ditto for dynamic shared memory segments */
+               while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
+               {
+                       dsm_segment *res = (dsm_segment *) 
DatumGetPointer(foundres);
+
+                       if (isCommit)
+                               PrintDSMLeakWarning(res);
+                       dsm_detach(res);
+               }
        }

Is that entirely unproblematic? Are there any DSM callbacks that rely on
locks still being held? Please split this part into a separate commit
with such analysis.



+/* The initial size of chunks in pages. */
+#define STS_MIN_CHUNK_PAGES 4

Could use quick description at how you've arrived at that specific
value.


+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+       int                     npages;                 /* Size of this chunk 
in BLCKSZ pages. */
+       int                     ntuples;                /* Number of tuples in 
this chunk. */
+       char            data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+       slock_t         mutex;
+       BlockNumber     read_page;              /* Page number for next read. */
+       BlockNumber     npages;                 /* Number of pages written. */
+       bool            writing;                /* Used only for assertions. */
+
+       /*
+        * We need variable sized chunks, because we might be asked to store
+        * gigantic tuples.  To avoid the locking contention that would come 
from
+        * reading chunk sizes from disk, we store the chunk size for ranges of
+        * the file in a compact format in memory.  chunk_pages starts out at
+        * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
+        * in chunk_expansion_log.
+        */
+       BlockNumber     chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
+       int                     chunk_expansions;
+       int                     chunk_expansion;
+       int                     chunk_pages;

This needs more explanation.

+/*
+ * Initialize a SharedTuplestore in existing shared memory.  There must be
+ * space for sts_estimate(participants) bytes.  If flags is set to the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).

s/iset set to the value/includes the value/ - otherwise it's not really
a flags argument.


+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple.  This is useful for
+ * the hash codes used for multi-batch hash joins, but could have other
+ * applications.

"hash codes"?



+/*
+ * Prepare to rescan.  Only participant should call this.  After it returns,
+ * all participants should call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next().
+ */

s/should/may/?  Also maybe document what happens with in-progress reads
(or rather them not being allowed to exist)?


+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+                        MinimalTuple tuple)
+{

+       /* Do we have space? */
+       size = accessor->sts->meta_data_size + tuple->t_len;
+       if (accessor->write_pointer + size >= accessor->write_end)
+       {
+               /* Try flushing to see if that creates enough space. */
+               if (accessor->write_chunk != NULL)
+                       sts_flush_chunk(accessor);
+
+               /*
+                * It may still not be enough in the case of a gigantic tuple, 
or if
+                * we haven't created a chunk buffer at all yet.
+                */
+               if (accessor->write_pointer + size >= accessor->write_end)
+               {
+                       SharedTuplestoreParticipant *participant;
+                       size_t  space_needed;
+                       int             pages_needed;
+
+                       /* How many pages to hold this data and the chunk 
header? */
+                       space_needed = offsetof(SharedTuplestoreChunk, data) + 
size;
+                       pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
+                       pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
+
+                       /*
+                        * Double the chunk size until it's big enough, and 
record that
+                        * fact in the shared expansion log so that readers 
know about it.
+                        */
+                       participant = 
&accessor->sts->participants[accessor->participant];
+                       while (accessor->write_pages < pages_needed)
+                       {
+                               accessor->write_pages *= 2;
+                               
participant->chunk_expansion_log[participant->chunk_expansions++] =
+                                       accessor->write_page;
+                       }

Hm. Isn't that going to be pretty unfunny if you have one large and a
lot of small tuples?


+                       /* Create the output buffer. */
+                       if (accessor->write_chunk != NULL)
+                               pfree(accessor->write_chunk);
+                       accessor->write_chunk = (SharedTuplestoreChunk *)
+                               palloc0(accessor->write_pages * BLCKSZ);

Are we guaranteed to be in a long-lived memory context here?


+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+       SharedTuplestoreParticipant *p;
+       BlockNumber     read_page;
+       int                     chunk_pages;
+       bool            eof;
+
+       for (;;)
+       {
+               /* Can we read more tuples from the current chunk? */
+               if (likely(accessor->read_ntuples < 
accessor->read_ntuples_available))
+                       return sts_read_tuple(accessor, meta_data);

I'm not convinced this is a good use of likely/unlikely (not biased and
not performance critical enough).

+               /* Find the location of a new chunk to read. */
+               p = &accessor->sts->participants[accessor->read_participant];
+
+               SpinLockAcquire(&p->mutex);
+               eof = p->read_page >= p->npages;
+               if (!eof)
+               {
+                       /*
+                        * Figure out how big this chunk is.  It will almost 
always be the
+                        * same as the last chunk loaded, but if there is one 
or more
+                        * entry in the chunk expansion log for this page then 
we know
+                        * that it doubled that number of times.  This avoids 
the need to
+                        * do IO to adjust the read head, so we don't need to 
hold up
+                        * concurrent readers.  (An alternative to this 
extremely rarely
+                        * run loop would be to use more space storing the new 
size in the
+                        * log so we'd have 'if' instead of 'while'.)
+                        */
+                       read_page = p->read_page;
+                       while (p->chunk_expansion < p->chunk_expansions &&
+                                  p->chunk_expansion_log[p->chunk_expansion] 
== p->read_page)
+                       {
+                               p->chunk_pages *= 2;
+                               p->chunk_expansion++;
+                       }
+                       chunk_pages = p->chunk_pages;
+
+                       /* The next reader will start after this chunk. */
+                       p->read_page += chunk_pages;
+               }
+               SpinLockRelease(&p->mutex);

This looks more like the job of an lwlock rather than a spinlock.



+/*
+ * Create the name used for our shared BufFiles.
+ */
+static void
+make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+       snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}

Name's a bit generic. And it's still not really making ;)


Going to buy some groceries and then look at the next patches.

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to