On Tue, Apr 19, 2016 at 6:11 PM, Kevin Grittner <kgri...@gmail.com> wrote: > On Tue, Apr 19, 2016 at 9:57 AM, Amit Kapila <amit.kapil...@gmail.com> wrote: >> On Sun, Apr 17, 2016 at 2:26 AM, Andres Freund <and...@anarazel.de> wrote: >>> >>> On 2016-04-16 16:44:52 -0400, Noah Misch wrote: >>> > That is more controversial than the potential ~2% regression for >>> > old_snapshot_threshold=-1. Alvaro[2] and Robert[3] are okay releasing >>> > that way, and Andres[4] is not. >>> >>> FWIW, I could be kinda convinced that it's temporarily ok, if there'd be >>> a clear proposal on the table how to solve the scalability issue around >>> MaintainOldSnapshotTimeMapping(). >> >> It seems that for read-only workloads, MaintainOldSnapshotTimeMapping() >> takes EXCLUSIVE LWLock which seems to be a probable reason for a performance >> regression. Now, here the question is do we need to acquire that lock if >> xmin is not changed since the last time value of >> oldSnapshotControl->latest_xmin is updated or xmin is lesser than equal to >> oldSnapshotControl->latest_xmin? >> If we don't need it for above cases, I think it can address the performance >> regression to a good degree for read-only workloads when the feature is >> enabled. > > Thanks, Amit -- I think something along those lines is the right > solution to the scaling issues when the feature is enabled. For > now I'm focusing on the back-patching issues and the performance > regression when the feature is disabled, but I'll shift focus to > this once the "killer" issues are in hand.
I had an idea I wanted to test out. The gist of it is to effectively have the last slot of timestamp to xid map stored in the latest_xmin field and only update the mapping when slot boundaries are crossed. See attached WIP patch for details. This way the exclusive lock only needs to be acquired once per minute. The common case is a spinlock that could be replaced with atomics later. And it seems to me that the mutex_threshold taken in TestForOldSnapshot() can also get pretty hot under some workloads, so that may also need some tweaking. I think a better approach would be to base the whole mechanism on a periodically updated counter, instead of timestamps. Autovacuum launcher looks like a good candidate to play the clock keeper, without it the feature has little point anyway. AFAICS only the clock keeper needs to have the timestamp xid mapping, others can make do with a couple of periodically updated values. I haven't worked it out in detail, but it feels like the code would be simpler. But this was a larger change than I felt comfortable trying out, so I went with the simple change first. However, while checking out if my proof of concept patch actually works I hit another issue. I couldn't get my test for the feature to actually work. The test script I used is attached. Basically I have a table with 1000 rows, one high throughput worker deleting old rows and inserting new ones, one long query that acquires a snapshot and sleeps for 30min, and one worker that has a repeatable read snapshot and periodically does count(*) on the table. Based on documentation I would expect the following: * The interfering query gets cancelled * The long running query gets to run * Old rows will start to be cleaned up after the threshold expires. However, testing on commit 9c75e1a36b6b2f3ad9f76ae661f42586c92c6f7c, I'm seeing that the old rows do not get cleaned up, and that I'm only seeing the interfering query get cancelled when old_snapshot_threshold = 0. Larger values do not result in cancellation. Am I doing something wrong or is the feature just not working at all? Regards, Ants Aasma
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 8aa1f49..dc00d91 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -80,8 +80,11 @@ typedef struct OldSnapshotControlData */ slock_t mutex_current; /* protect current timestamp */ int64 current_timestamp; /* latest snapshot timestamp */ - slock_t mutex_latest_xmin; /* protect latest snapshot xmin */ + slock_t mutex_latest_xmin; /* protect latest snapshot xmin + * and next_map_update + */ TransactionId latest_xmin; /* latest snapshot xmin */ + int64 next_map_update; /* latest snapshot valid up to */ slock_t mutex_threshold; /* protect threshold fields */ int64 threshold_timestamp; /* earlier snapshot is old */ TransactionId threshold_xid; /* earlier xid may be gone */ @@ -95,7 +98,9 @@ typedef struct OldSnapshotControlData * count_used value of old_snapshot_threshold means that the buffer is * full and the head must be advanced to add new entries. Use timestamps * aligned to minute boundaries, since that seems less surprising than - * aligning based on the first usage timestamp. + * aligning based on the first usage timestamp. The latest bucket is + * effectively stored within latest_xmin. The circular buffer is updated + * when we get a new xmin value that doesn't fall into the same interval. * * It is OK if the xid for a given time slot is from earlier than * calculated by adding the number of minutes corresponding to the @@ -269,6 +274,7 @@ SnapMgrInit(void) oldSnapshotControl->current_timestamp = 0; SpinLockInit(&oldSnapshotControl->mutex_latest_xmin); oldSnapshotControl->latest_xmin = InvalidTransactionId; + oldSnapshotControl->next_map_update = 0; SpinLockInit(&oldSnapshotControl->mutex_threshold); oldSnapshotControl->threshold_timestamp = 0; oldSnapshotControl->threshold_xid = InvalidTransactionId; @@ -1594,9 +1600,15 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, { int64 ts = GetSnapshotCurrentTimestamp(); TransactionId xlimit = recentXmin; - TransactionId latest_xmin = oldSnapshotControl->latest_xmin; + TransactionId latest_xmin; + int64 update_ts; bool same_ts_as_threshold = false; + SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); + latest_xmin = oldSnapshotControl->latest_xmin; + update_ts = oldSnapshotControl->next_map_update; + SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin); + /* * Zero threshold always overrides to latest xmin, if valid. Without * some heuristic it will find its own snapshot too old on, for @@ -1631,6 +1643,14 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, if (!same_ts_as_threshold) { + if (ts == update_ts) + { + xlimit = latest_xmin; + if (NormalTransactionIdFollows(xlimit, recentXmin)) + SetOldSnapshotThresholdTimestamp(ts, xlimit); + } + else + { LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED); if (oldSnapshotControl->count_used > 0 @@ -1651,6 +1671,7 @@ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin, } LWLockRelease(OldSnapshotTimeMapLock); + } } /* @@ -1680,17 +1701,36 @@ void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin) { int64 ts; + TransactionId latest_xmin; + int64 update_ts; + bool map_update_required = false; /* Fast exit when old_snapshot_threshold is not used. */ if (old_snapshot_threshold < 0) return; - /* Keep track of the latest xmin seen by any process. */ + ts = AlignTimestampToMinuteBoundary(whenTaken); + + /* + * Keep track of the latest xmin seen by any process. Update mapping + * with a new value when we have crossed a bucket boundary. + */ SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin); - if (TransactionIdFollows(xmin, oldSnapshotControl->latest_xmin)) + latest_xmin = oldSnapshotControl->latest_xmin; + update_ts = oldSnapshotControl->next_map_update; + if (ts > update_ts) + { + oldSnapshotControl->next_map_update = ts; + map_update_required = true; + } + if (TransactionIdFollows(xmin, latest_xmin)) oldSnapshotControl->latest_xmin = xmin; SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin); + /* We only needed to update the most recent xmin value. */ + if (!map_update_required) + return; + /* No further tracking needed for 0 (used for testing). */ if (old_snapshot_threshold == 0) return; @@ -1716,8 +1756,6 @@ MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin) return; } - ts = AlignTimestampToMinuteBoundary(whenTaken); - LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE); Assert(oldSnapshotControl->head_offset >= 0);
import psycopg2 import threading from time import time, sleep from datetime import datetime import sys TBL_SIZE = 1000 PADDING_SIZE = 100 DISPLAY_INTERVAL = 10 CONNSTRING = sys.argv[1] if len(sys.argv) > 1 else "" def connect(name): conn = psycopg2.connect("%s application_name=%s" % (CONNSTRING, name)) return conn def init_high_tp(cur): cur.execute("DROP TABLE IF EXISTS high_throughput") cur.execute("CREATE TABLE high_throughput (id int4 primary key, padding text)") cur.execute("INSERT INTO high_throughput SELECT x, repeat(' ', %d) FROM generate_series(1,%d) x" % (PADDING_SIZE, TBL_SIZE)) def show(msg): print datetime.now().strftime("[%H:%M:%S] "), msg def high_tp_thread(): conn = connect("write-workload") cur = conn.cursor() init_high_tp(cur) cur.execute("SHOW old_snapshot_threshold") row = cur.fetchone() show("old_snapshot_threshold = %s" % (row[0],)) conn.commit() last_display = 0 i = 1 start = time() while True: cur_time = time() if cur_time - last_display > DISPLAY_INTERVAL: last_display = cur_time cur.execute("SELECT pg_table_size('high_throughput'), clock_timestamp() - last_autovacuum FROM pg_stat_user_tables WHERE relname = 'high_throughput'") row = cur.fetchone() show("High throughput table size @ %5ds. Size %6dkB Last vacuum %s ago" % (int(cur_time - start), row[0]/1024,row[1],)) cur.execute("DELETE FROM high_throughput WHERE id = %s", (i,)) cur.execute("INSERT INTO high_throughput VALUES (%s, REPEAT(' ',%s))", (i+TBL_SIZE, PADDING_SIZE)) conn.commit() i += 1 conn.close() def long_ss_thread(interval=1800): conn = connect("long-unrelated-query") cur = conn.cursor() while True: show("Starting %ds long query" % interval) try: cur.execute("SELECT NOW(), pg_sleep(%s)", (interval,)) conn.commit() except psycopg2.Error, e: show("Long query canceled due to %s" % (e,)) break def long_ss_error_thread(): sleep(1) conn = connect("interfering-query") cur = conn.cursor() while True: try: cur.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") while True: cur.execute("SELECT COUNT(*), MAX(id) FROM high_throughput") row = cur.fetchone() show("Counted %d rows with max %d in high_throughput table" % (row[0],row[1],)) sleep(DISPLAY_INTERVAL) except psycopg2.Error, e: show("Interfering query got error %s" % (e,)) try: conn.rollback() except: return show("Waiting 3min to restart interfering query") sleep(180) threads = [] for parallel_func in [high_tp_thread, long_ss_thread, long_ss_error_thread]: t = threading.Thread(target=parallel_func) threads.append(t) t.start() for t in threads: t.join()
-- Sent via pgsql-committers mailing list (pgsql-committers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-committers