On 14/09/14 18:55, Tom Lane wrote:
> Are you watching the state in a loop inside a single plpgsql function?
> If so, I wonder whether the problem is that the plpgsql function's
> snapshot isn't changing. From memory, marking the function VOLATILE
> would help if that's the issue.
The function is VOLATILE. I attached 2 versions of it. fn-old.sql does
not work because once a slave has disconnected it drops out and does not
come back. fn.sql uses dblink to work around the problem. But it
consumes 2 db connections.
The intent of the function is to be called between operations that may
cause slaves to lag behind. If the lag is below a certain limit, it
simply returns. Otherwise, it waits until the lag drops below a second
limit.
If it were a VOLATILE problem, the functions would not be able to see
when a slave drops out nor changes in the data. But it does see these
changes. Only when a slave comes back online, it is not seen in the
current transaction.
Torsten
CREATE OR REPLACE FUNCTION wait_for_streaming_lag(low_water_mark BIGINT DEFAULT 1000000, high_water_mark BIGINT DEFAULT 20000000, tmout INTERVAL DEFAULT '4h')
RETURNS BIGINT
AS $def$
DECLARE r RECORD;
water_mark BIGINT;
BEGIN
SET LOCAL client_min_messages TO ERROR;
CREATE TEMP TABLE IF NOT EXISTS lag (
gen INT,
application_name TEXT,
client_addr INET,
flush_location TEXT,
lmd TIMESTAMP
);
SET LOCAL client_min_messages TO NOTICE;
water_mark := $2; -- use high_water_mark for the first loop
LOOP
WITH g AS (SELECT max(gen) AS gen FROM lag),
r AS (SELECT 1 AS ord, application_name, client_addr, flush_location, clock_timestamp() AS lmd
FROM pg_stat_replication
UNION ALL
SELECT 2 AS ord, application_name, client_addr, flush_location, lmd
FROM lag)
INSERT INTO lag
SELECT coalesce(g.gen+1, 1), rx.*
FROM (SELECT DISTINCT ON (application_name, client_addr)
application_name, client_addr, flush_location, lmd
FROM r
ORDER BY application_name,
client_addr,
ord ASC,
pg_xlog_location_diff(flush_location, '0/0') ASC) rx CROSS JOIN g;
DELETE FROM lag WHERE gen<(SELECT max(gen) FROM lag);
DELETE FROM lag WHERE lmd<clock_timestamp() - '5min'::INTERVAL;
SELECT INTO r coalesce(max(pg_xlog_location_diff(pg_current_xlog_location(), flush_location)), 0) AS lag,
clock_timestamp()-now() AS tm FROM lag;
EXIT WHEN r.lag <= water_mark;
IF r.tm>$3 THEN
RAISE EXCEPTION USING
MESSAGE='Timeout while waiting for streaming lag to drop below ' || $1,
ERRCODE='TF001';
END IF;
water_mark := $1;
PERFORM pg_sleep(1);
END LOOP;
RETURN r.lag;
END;
$def$ LANGUAGE plpgsql VOLATILE SECURITY invoker;
BEGIN;
CREATE OR REPLACE FUNCTION wait_for_streaming_lag(low_water_mark BIGINT DEFAULT 1000000, high_water_mark BIGINT DEFAULT 20000000, tmout INTERVAL DEFAULT '4h')
RETURNS BIGINT
AS $def$
DECLARE r RECORD;
water_mark BIGINT;
BEGIN
-- we need dblink here because pg_stat_replication at least in 9.3,
-- although it does report replicas dropping out, it does not report
-- replicas reconnecting if called in a transaction.
PERFORM dblink_connect('wait_for_streaming_lag', 'dbname=' || current_database() || ' application_name=wait_for_streaming_lag')
WHERE NOT EXISTS (SELECT 1 FROM unnest(dblink_get_connections()) c(c)
WHERE c='wait_for_streaming_lag');
SET LOCAL client_min_messages TO ERROR;
CREATE TEMP TABLE IF NOT EXISTS lag (
gen INT,
application_name TEXT,
client_addr INET,
flush_location TEXT,
lmd TIMESTAMP
);
SET LOCAL client_min_messages TO NOTICE;
water_mark := $2; -- use high_water_mark for the first loop
LOOP
WITH g AS (SELECT max(gen) AS gen FROM lag),
r AS (SELECT 1 AS ord, application_name, client_addr, flush_location, clock_timestamp() AS lmd
FROM dblink('wait_for_streaming_lag', $$
SELECT application_name, client_addr, flush_location
FROM pg_stat_replication
$$) repl(application_name TEXT, client_addr INET, flush_location TEXT)
UNION ALL
SELECT 2 AS ord, application_name, client_addr, flush_location, lmd
FROM lag)
INSERT INTO lag
SELECT coalesce(g.gen+1, 1), rx.*
FROM (SELECT DISTINCT ON (application_name, client_addr)
application_name, client_addr, flush_location, lmd
FROM r
ORDER BY application_name,
client_addr,
ord ASC,
pg_xlog_location_diff(flush_location, '0/0') ASC) rx CROSS JOIN g;
DELETE FROM lag WHERE gen<(SELECT max(gen) FROM lag);
DELETE FROM lag WHERE lmd<clock_timestamp() - '5min'::INTERVAL;
SELECT INTO r coalesce(max(pg_xlog_location_diff(pg_current_xlog_location(), flush_location)), 0) AS lag,
clock_timestamp()-now() AS tm FROM lag;
EXIT WHEN r.lag <= water_mark;
IF r.tm>$3 THEN
RAISE EXCEPTION USING
MESSAGE='Timeout while waiting for streaming lag to drop below ' || $1,
ERRCODE='TF001';
END IF;
water_mark := $1;
PERFORM pg_sleep(1);
END LOOP;
RETURN r.lag;
END;
$def$ LANGUAGE plpgsql VOLATILE SECURITY invoker;
COMMIT;
--
Sent via pgsql-general mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general