From b0b95e00b37b8ec7bb49e069861a5f7aaf21b2c6 Mon Sep 17 00:00:00 2001
From: Asim R P <apraveen@pivotal.io>
Date: Fri, 6 Sep 2019 18:31:56 +0530
Subject: [PATCH v2 1/6] Framework to inject faults from SQL tests

This patch introduces ability to define fault points in backend code
and a SQL interface to inject the faults.  A backend process, when
reaches the fault point during execution, an action specified by the
SQL interface is taken.

The framework should help automate complex scenarios using SQL based
tests, including crash recovery, specific interleaving of concurrent
transactions, etc. that are otherwise cumbersome to implement.

The fault injector code is enabled by a preprocessor flag at configure
time: CPPFLAGS=-DFAULT_INJECTOR

Reviewed by: Adam Berlin and Taylor Vesely
---
 contrib/faultinjector/.gitignore                   |   2 +
 contrib/faultinjector/Makefile                     |  21 +
 contrib/faultinjector/README                       | 143 ++++
 .../faultinjector/expected/faultinjector_test.out  |  98 +++
 contrib/faultinjector/faultinjector--1.0.sql       | 111 +++
 contrib/faultinjector/faultinjector.c              | 118 +++
 contrib/faultinjector/faultinjector.control        |   5 +
 contrib/faultinjector/sql/faultinjector_test.sql   |  40 +
 src/backend/access/transam/xlog.c                  |   6 +
 src/backend/postmaster/postmaster.c                |  11 +
 src/backend/storage/ipc/ipci.c                     |   8 +-
 src/backend/tcop/postgres.c                        |  86 ++
 src/backend/utils/init/postinit.c                  |   7 +-
 src/backend/utils/misc/Makefile                    |   2 +-
 src/backend/utils/misc/faultinjector.c             | 903 +++++++++++++++++++++
 src/include/utils/faultinjector.h                  |  96 +++
 src/include/utils/faultinjector_lists.h            |  71 ++
 src/interfaces/libpq/fe-connect.c                  |   6 +
 src/interfaces/libpq/fe-protocol3.c                |   4 +
 src/interfaces/libpq/libpq-int.h                   |   3 +
 20 files changed, 1738 insertions(+), 3 deletions(-)
 create mode 100644 contrib/faultinjector/.gitignore
 create mode 100644 contrib/faultinjector/Makefile
 create mode 100644 contrib/faultinjector/README
 create mode 100644 contrib/faultinjector/expected/faultinjector_test.out
 create mode 100644 contrib/faultinjector/faultinjector--1.0.sql
 create mode 100644 contrib/faultinjector/faultinjector.c
 create mode 100644 contrib/faultinjector/faultinjector.control
 create mode 100644 contrib/faultinjector/sql/faultinjector_test.sql
 create mode 100644 src/backend/utils/misc/faultinjector.c
 create mode 100644 src/include/utils/faultinjector.h
 create mode 100644 src/include/utils/faultinjector_lists.h

diff --git a/contrib/faultinjector/.gitignore b/contrib/faultinjector/.gitignore
new file mode 100644
index 0000000000..19b6c5ba42
--- /dev/null
+++ b/contrib/faultinjector/.gitignore
@@ -0,0 +1,2 @@
+# Generated subdirectories
+/results/
diff --git a/contrib/faultinjector/Makefile b/contrib/faultinjector/Makefile
new file mode 100644
index 0000000000..1c3cad68ed
--- /dev/null
+++ b/contrib/faultinjector/Makefile
@@ -0,0 +1,21 @@
+MODULES = faultinjector
+
+EXTENSION = faultinjector
+DATA = faultinjector--1.0.sql
+
+REGRESS = faultinjector_test
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LDFLAGS = $(libpq)
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+SHLIB_PREREQS = submake-libpq
+subdir = contrib/faultinjector
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/faultinjector/README b/contrib/faultinjector/README
new file mode 100644
index 0000000000..5c99f41d8e
--- /dev/null
+++ b/contrib/faultinjector/README
@@ -0,0 +1,143 @@
+Fault Injection Framework
+=========================
+
+Fault is defined as a point of interest in the source code with an
+associated action to be taken when that point is hit during execution.
+Fault points are defined using SIMPLE_FAULT_INJECTOR() macro or by
+directly invoking the FaultInjector_TriggerFaultIfSet() function.  A
+fault point is identifed by a name.  This module provides an interface
+to inject a pre-defined fault point into a running PostgreSQL database
+by associating an action with the fault point.  Action can be error,
+panic, sleep, skip, infinite_loop, etc.
+
+SQL based tests can make use of the "inject_fault()" interface to
+simulate complex scenarios that are otherwise cumbersome to automate.
+
+For example,
+
+   select inject_fault('checkpoint', 'error');
+
+The above command causes the next checkpoint to fail with elog(ERROR).
+The 'checkpoint' fault is defined in CreateCheckPoint() function in
+xlog.c.  Note that the fault is set to trigger only once by default.
+Subsequent checkpoints will not be affected by the above fault.
+
+   select inject_fault('checkpoint', 'status');
+
+The above command checks the status of the fault.  It reports the
+number of times the fault has been triggered during execution and
+whether it has completed.  Faults that are completed will no longer
+trigger.
+
+   select wait_until_triggered_fault('checkpoint', 1);
+
+The above command blocks until the checkpoint fault is triggered once.
+
+   select inject_fault('checkpoint', 'reset');
+
+The above command removes the fault, such that no action will be taken
+when the fault point is reached during execution.  A fault can be set
+to trigger more than once.  For example:
+
+   select inject_fault_infinite('checkpoint', 'error');
+
+This command causes checkpoints to fail until the fault is removed.
+
+More detailed interface
+-----------------------
+
+A more detailed version of the fault injector interface accepts
+several more paramters.  Let us assume that a fault named
+"heap_insert" has been defined in function heap_insert() in backend
+code, like so:
+
+--- a/src/backend/access/heap/heapam.c
++++ b/src/backend/access/heap/heapam.c
+@@ -1875,6 +1875,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
+	Buffer		vmbuffer = InvalidBuffer;
+	bool		all_visible_cleared = false;
+
++#ifdef FAULT_INJECTOR
++	FaultInjector_TriggerFaultIfSet(
++		"heap_insert",
++		"" /* database name */,
++		RelationGetRelationName(relation));
++#endif
++
+
+A SQL test may want to inject "heap_insert" fault such that inserts
+into a table named "my_table" fail for first 10 tuples.
+
+   select inject_fault(
+      'heap_insert',
+	  'error',
+	  '' /* database name */,
+	  'my_table' /* table name */,
+	  1 /* start occurrence */,
+	  10 /* end occurrence */,
+	  0 /* */);
+
+The above command injects heap_insert fault such that the inserting
+transaction will abort with elog(ERROR) when the code reaches the
+fault point, only if the relation being inserted to has the name
+'my_table'.  Moreover, the fault will stop triggering after 10 tuples
+have been inserted into the my_table.  The 11th transaction to insert
+into my_table will continue the insert as usual.
+
+Fault actions
+-------------
+
+Fault action is specified as the type parameter in inject_fault()
+interface.  The following types are supported.
+
+error
+   elog(ERROR)
+
+fatal
+   elog(FATAL)
+
+panic
+   elog(PANIC)
+
+sleep
+   sleep for specified amount of time
+
+infinite_loop
+   block until the query is canceled or terminated
+
+suspend
+   block until the fault is removed
+
+resume
+   resume backend processes that are blocked due to a suspend fault
+
+skip
+   do nothing (used to implement custom logic that is not supported by
+   predefined actions)
+
+reset
+   remove a previously injected fault
+
+segv
+   crash the backend process due to SIGSEGV
+
+interrupt
+   simulate cancel interrupt arrival, such that the next
+   interrupt processing cycle will cancel the query
+
+finish_pending
+   similar to interrupt, sets the QueryFinishPending global flag
+
+status
+   return a text datum with details of how many times a fault has been
+   triggered, the state it is currently in.  Fault states are as follows:
+
+      "set" injected but the fault point has not been reached during
+      execution yet.
+
+      "tiggered" the fault point has been reached at least once during
+      execution.
+
+      "completed" the action associated with the fault point will no
+      longer be taken because the fault point has been reached maximum
+      number of times during execution.
\ No newline at end of file
diff --git a/contrib/faultinjector/expected/faultinjector_test.out b/contrib/faultinjector/expected/faultinjector_test.out
new file mode 100644
index 0000000000..3ee92d9cc4
--- /dev/null
+++ b/contrib/faultinjector/expected/faultinjector_test.out
@@ -0,0 +1,98 @@
+CREATE EXTENSION faultinjector;
+-- start with a clean slate
+select inject_fault('all', 'reset');
+ inject_fault 
+--------------
+ Success:
+(1 row)
+
+-- inject fault of type skip
+select inject_fault('checkpoint', 'skip', '', '', 1, 2, 0);
+ inject_fault 
+--------------
+ Success:
+(1 row)
+
+-- wait for fault triggered 0 times, should not block
+select wait_until_triggered_fault('checkpoint', 0);
+ wait_until_triggered_fault 
+----------------------------
+ Success:
+(1 row)
+
+-- trigger a checkpoint which will trigger the fault
+checkpoint;
+select wait_until_triggered_fault('checkpoint', 1);
+ wait_until_triggered_fault 
+----------------------------
+ Success:
+(1 row)
+
+-- check status
+select inject_fault('checkpoint', 'status');
+                                                                                         inject_fault                                                                                         
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Success: fault name:'checkpoint' fault type:'skip' database name:'' table name:'' start occurrence:'1' end occurrence:'2' extra arg:'0' fault injection state:'triggered' num times hit:'1' +
+ 
+(1 row)
+
+select inject_fault('checkpoint', 'reset');
+ inject_fault 
+--------------
+ Success:
+(1 row)
+
+-- inject fault of type error, set it to trigger two times
+select inject_fault('checkpoint', 'error', '', '', 1, 2, 0);
+ inject_fault 
+--------------
+ Success:
+(1 row)
+
+-- trigger once
+checkpoint;
+ERROR:  checkpoint request failed
+HINT:  Consult recent messages in the server log for details.
+select inject_fault('checkpoint', 'status');
+                                                                                         inject_fault                                                                                          
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Success: fault name:'checkpoint' fault type:'error' database name:'' table name:'' start occurrence:'1' end occurrence:'2' extra arg:'0' fault injection state:'triggered' num times hit:'1' +
+ 
+(1 row)
+
+-- trigger twice
+checkpoint;
+ERROR:  checkpoint request failed
+HINT:  Consult recent messages in the server log for details.
+select inject_fault('checkpoint', 'status');
+                                                                                         inject_fault                                                                                          
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Success: fault name:'checkpoint' fault type:'error' database name:'' table name:'' start occurrence:'1' end occurrence:'2' extra arg:'0' fault injection state:'completed' num times hit:'2' +
+ 
+(1 row)
+
+-- no error the third time onwards
+checkpoint;
+select inject_fault('checkpoint', 'status');
+                                                                                         inject_fault                                                                                          
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Success: fault name:'checkpoint' fault type:'error' database name:'' table name:'' start occurrence:'1' end occurrence:'2' extra arg:'0' fault injection state:'completed' num times hit:'2' +
+ 
+(1 row)
+
+-- remote fault injector API
+select current_setting('port') as port \gset
+select inject_fault('checkpoint', 'status', 'localhost', :port);
+                                                                                         inject_fault                                                                                          
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Success: fault name:'checkpoint' fault type:'error' database name:'' table name:'' start occurrence:'1' end occurrence:'2' extra arg:'0' fault injection state:'completed' num times hit:'2' +
+ 
+(1 row)
+
+-- reset the fault
+select inject_fault('checkpoint', 'reset');
+ inject_fault 
+--------------
+ Success:
+(1 row)
+
diff --git a/contrib/faultinjector/faultinjector--1.0.sql b/contrib/faultinjector/faultinjector--1.0.sql
new file mode 100644
index 0000000000..d1068fdb74
--- /dev/null
+++ b/contrib/faultinjector/faultinjector--1.0.sql
@@ -0,0 +1,111 @@
+/* contrib/faultinjector/faultinjector--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION faultinjector" to load this file. \quit
+
+--
+-- Inject a fault that is predefined in backend code.  The fault is
+-- associated with an action.  The specified action will be taken by a
+-- backend process when the fault point is reached during execution.
+--
+--    faultname: name of the fault, this should match the definition
+--    in backend code.
+--
+--    type: action to be taken when the fault is reached during
+--    execution.  E.g. "error", "panic".  See below for explanation of
+--    each fault type.
+--
+--    database (optional): the fault will be triggered only if current
+--    database of a backend process name matches this one.
+--
+--    tablename (optional): the fault will be triggered only if
+--    current table name matches this one.
+--
+--    start_occurrence (optional): the fault will start triggering
+--    after it is reached as many times during in a backend process
+--    during execution.
+--
+--    end_occurrence (optional): the fault will stop triggering after
+--    it has been triggered as many times.
+--
+--    extra_arg (optional): used to specify the number of seconds to
+--    sleep when injecting a "sleep" type of fault.
+--
+CREATE FUNCTION inject_fault(
+  faultname text,
+  type text,
+  database text,
+  tablename text,
+  start_occurrence int4,
+  end_occurrence int4,
+  extra_arg int4)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C VOLATILE STRICT;
+
+CREATE FUNCTION inject_fault_remote(
+  faultname text,
+  type text,
+  database text,
+  tablename text,
+  start_occurrence int4,
+  end_occurrence int4,
+  extra_arg int4,
+  hostname text,
+  port int4)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C VOLATILE STRICT;
+
+-- Simpler version to inject fault such that it is triggered only one
+-- time, starting at the first occurrence.  Not tied to any database /
+-- table.
+CREATE FUNCTION inject_fault(
+  faultname text,
+  type text)
+RETURNS text
+AS $$ select inject_fault($1, $2, '', '', 1, 1, 0) $$
+LANGUAGE SQL;
+
+CREATE FUNCTION inject_fault(
+  faultname text,
+  type text,
+  hostname text,
+  port int4)
+RETURNS text
+AS $$ select inject_fault_remote($1, $2, '', '', 1, 1, 0, $3, $4) $$
+LANGUAGE SQL;
+
+-- Simpler version, always trigger until fault it is reset.
+CREATE FUNCTION inject_fault_infinite(
+  faultname text,
+  type text)
+RETURNS text
+AS $$ select inject_fault($1, $2, '', '', 1, -1, 0) $$
+LANGUAGE SQL;
+
+CREATE FUNCTION inject_fault_infinite(
+  faultname text,
+  type text,
+  hostname text,
+  port int4)
+RETURNS text
+AS $$ select inject_fault_remote($1, $2, '', '', 1, -1, 0, $3, $4) $$
+LANGUAGE SQL;
+
+-- Wait until a fault is triggered desired number of times.
+CREATE FUNCTION wait_until_triggered_fault(
+  faultname text,
+  numtimestriggered int4)
+RETURNS text
+AS $$ select inject_fault($1, 'wait_until_triggered', '', '', 1, 1, $2) $$
+LANGUAGE SQL;
+
+CREATE FUNCTION wait_until_triggered_fault(
+  faultname text,
+  numtimestriggered int4,
+  hostname text,
+  port int4)
+RETURNS text
+AS $$ select inject_fault_remote($1, 'wait_until_triggered', '', '', 1, 1, $2, $3, $4) $$
+LANGUAGE SQL;
diff --git a/contrib/faultinjector/faultinjector.c b/contrib/faultinjector/faultinjector.c
new file mode 100644
index 0000000000..7be6754945
--- /dev/null
+++ b/contrib/faultinjector/faultinjector.c
@@ -0,0 +1,118 @@
+/*
+ * faultinjector.c
+ *
+ * SQL interface to inject a pre-defined fault in backend code.
+ */
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "libpq-fe.h"
+#include "utils/builtins.h"
+#include "utils/faultinjector.h"
+
+PG_MODULE_MAGIC;
+
+extern Datum inject_fault(PG_FUNCTION_ARGS);
+extern Datum inject_fault_remote(PG_FUNCTION_ARGS);
+
+PG_FUNCTION_INFO_V1(inject_fault);
+PG_FUNCTION_INFO_V1(inject_fault_remote);
+
+/*
+ * SQL UDF to inject a fault by associating an action against it.  See
+ * the accompanying README for more details.
+ */
+Datum
+inject_fault(PG_FUNCTION_ARGS)
+{
+	char	   *faultName = TextDatumGetCString(PG_GETARG_DATUM(0));
+	char	   *type = TextDatumGetCString(PG_GETARG_DATUM(1));
+	char	   *databaseName = TextDatumGetCString(PG_GETARG_DATUM(2));
+	char	   *tableName = TextDatumGetCString(PG_GETARG_DATUM(3));
+	int			startOccurrence = PG_GETARG_INT32(4);
+	int			endOccurrence = PG_GETARG_INT32(5);
+	int			extraArg = PG_GETARG_INT32(6);
+	char	   *response;
+
+	response = InjectFault(
+						   faultName, type, databaseName, tableName,
+						   startOccurrence, endOccurrence, extraArg);
+	if (!response)
+		elog(ERROR, "failed to inject fault");
+	if (strncmp(response, "Success:", strlen("Success:")) != 0)
+		elog(ERROR, "%s", response);
+	PG_RETURN_TEXT_P(cstring_to_text(response));
+}
+
+Datum
+inject_fault_remote(PG_FUNCTION_ARGS)
+{
+	char	   *faultName = TextDatumGetCString(PG_GETARG_DATUM(0));
+	char	   *type = TextDatumGetCString(PG_GETARG_DATUM(1));
+	char	   *databaseName = TextDatumGetCString(PG_GETARG_DATUM(2));
+	char	   *tableName = TextDatumGetCString(PG_GETARG_DATUM(3));
+	int			startOccurrence = PG_GETARG_INT32(4);
+	int			endOccurrence = PG_GETARG_INT32(5);
+	int			extraArg = PG_GETARG_INT32(6);
+	char	   *hostname = TextDatumGetCString(PG_GETARG_DATUM(7));
+	int			port = PG_GETARG_INT32(8);
+	char	   *response;
+	char	    conninfo[1024];
+	char	    msg[1024];
+	PGconn	   *conn;
+	PGresult   *res;
+
+	/* Set special connection option "fault=true" */
+	snprintf(conninfo, 1024, "host=%s port=%d fault=true", hostname, port);
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+		elog(ERROR, "connection to %s:%d failed: %s",
+			 hostname, port, PQerrorMessage(conn));
+
+	/*
+	 * If dbname or tablename is not specified, send '#' instead.  This allows
+	 * sscanf to be used on the receiving end to parse the message.
+	 */
+	if (!databaseName || databaseName[0] == '\0')
+		databaseName = "#";
+	if (!tableName || tableName[0] == '\0')
+		tableName = "#";
+	snprintf(msg, 1024, "faultname=%s type=%s db=%s table=%s "
+			 "start=%d end=%d extra=%d",
+			 faultName, type,
+			 databaseName,
+			 tableName,
+			 startOccurrence,
+			 endOccurrence,
+			 extraArg);
+
+	res = PQexec(conn, msg);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		elog(ERROR, "failed to inject fault: %s", PQerrorMessage(conn));
+
+	if (PQntuples(res) != 1)
+	{
+		PQclear(res);
+		PQfinish(conn);
+		elog(ERROR, "invalid response from %s:%d", hostname, port);
+	}
+
+	response = PQgetvalue(res, 0, Anum_fault_message_response_status);
+	if (strncmp(response, "Success:",  strlen("Success:")) != 0)
+	{
+		PQclear(res);
+		PQfinish(conn);
+		elog(ERROR, "%s", response);
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (!response)
+		elog(ERROR, "failed to inject fault");
+	if (strncmp(response, "Success:", strlen("Success:")) != 0)
+		elog(ERROR, "%s", response);
+	PG_RETURN_TEXT_P(cstring_to_text(response));
+}
diff --git a/contrib/faultinjector/faultinjector.control b/contrib/faultinjector/faultinjector.control
new file mode 100644
index 0000000000..6968ddd7d7
--- /dev/null
+++ b/contrib/faultinjector/faultinjector.control
@@ -0,0 +1,5 @@
+# gp_fault_inject extension
+comment = 'simulate various faults for testing purposes'
+default_version = '1.0'
+module_pathname = '$libdir/faultinjector'
+relocatable = true
diff --git a/contrib/faultinjector/sql/faultinjector_test.sql b/contrib/faultinjector/sql/faultinjector_test.sql
new file mode 100644
index 0000000000..cda142c718
--- /dev/null
+++ b/contrib/faultinjector/sql/faultinjector_test.sql
@@ -0,0 +1,40 @@
+CREATE EXTENSION faultinjector;
+
+-- start with a clean slate
+select inject_fault('all', 'reset');
+
+-- inject fault of type skip
+select inject_fault('checkpoint', 'skip', '', '', 1, 2, 0);
+
+-- wait for fault triggered 0 times, should not block
+select wait_until_triggered_fault('checkpoint', 0);
+
+-- trigger a checkpoint which will trigger the fault
+checkpoint;
+select wait_until_triggered_fault('checkpoint', 1);
+
+-- check status
+select inject_fault('checkpoint', 'status');
+select inject_fault('checkpoint', 'reset');
+
+-- inject fault of type error, set it to trigger two times
+select inject_fault('checkpoint', 'error', '', '', 1, 2, 0);
+
+-- trigger once
+checkpoint;
+select inject_fault('checkpoint', 'status');
+
+-- trigger twice
+checkpoint;
+select inject_fault('checkpoint', 'status');
+
+-- no error the third time onwards
+checkpoint;
+select inject_fault('checkpoint', 'status');
+
+-- remote fault injector API
+select current_setting('port') as port \gset
+select inject_fault('checkpoint', 'status', 'localhost', :port);
+
+-- reset the fault
+select inject_fault('checkpoint', 'reset');
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6876537b62..fd3dbfec56 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -68,6 +68,7 @@
 #include "storage/spin.h"
 #include "storage/sync.h"
 #include "utils/builtins.h"
+#include "utils/faultinjector.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -8530,6 +8531,11 @@ CreateCheckPoint(int flags)
 	else
 		shutdown = false;
 
+#ifdef FAULT_INJECTOR
+	if (SIMPLE_FAULT_INJECTOR("checkpoint") == FaultInjectorTypeSkip)
+		return;
+#endif
+
 	/* sanity check */
 	if (RecoveryInProgress() && (flags & CHECKPOINT_END_OF_RECOVERY) == 0)
 		elog(ERROR, "can't create a checkpoint during recovery");
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a5446d54bb..76dedb3872 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -125,6 +125,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/datetime.h"
+#include "utils/faultinjector.h"
 #include "utils/memutils.h"
 #include "utils/pidfile.h"
 #include "utils/ps_status.h"
@@ -2122,6 +2123,10 @@ retry1:
 									valptr),
 							 errhint("Valid values are: \"false\", 0, \"true\", 1, \"database\".")));
 			}
+#ifdef FAULT_INJECTOR
+			else if (strcmp(nameptr, "fault") == 0)
+				am_faultinjector = true;
+#endif
 			else if (strncmp(nameptr, "_pq_.", 5) == 0)
 			{
 				/*
@@ -2247,6 +2252,12 @@ retry1:
 	if (am_walsender && !am_db_walsender)
 		port->database_name[0] = '\0';
 
+#ifdef FAULT_INJECTOR
+	/* Fault handler process need not connect to a particular database. */
+	if (am_faultinjector)
+		port->database_name[0] = '\0';
+#endif
+
 	/*
 	 * Done putting stuff in TopMemoryContext.
 	 */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 885370698f..a2d9bcd07e 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -44,6 +44,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/faultinjector.h"
 #include "utils/snapmgr.h"
 
 /* GUCs */
@@ -147,6 +148,9 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+#ifdef FAULT_INJECTOR
+		size = add_size(size, FaultInjector_ShmemSize());
+#endif
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -263,7 +267,9 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
-
+#ifdef FAULT_INJECTOR
+	FaultInjector_ShmemInit();
+#endif
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index e8d8e6f828..9ed7bf9c4e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -71,6 +71,7 @@
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
+#include "utils/faultinjector.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -974,6 +975,87 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
 	return stmt_list;
 }
 
+#ifdef FAULT_INJECTOR
+/*
+ * Fault injector commands are messages of the form:
+ *
+ *     "<key>=<value> <key>=<value> ..."
+ *
+ * They are used when injecting a fault into a remote server over libpq.  The
+ * keys map to arguments of InjectFault function (see faultinjector contrib
+ * module).  Keys are defined as follows:
+ *
+ *    faultname: name of the fault, this should match the definition in
+ *    backend code.
+ *
+ *    type: action to be taken when the fault is reached during execution.
+ *    E.g. "error", "panic".  See below for explanation of each fault type.
+ *
+ *    database: the fault will be triggered only if current database of a
+ *    backend process name matches this one.
+ *
+ *    tablename: the fault will be triggered only if current table name
+ *    matches this one.
+ *
+ *    start_occurrence: the fault will start triggering after it is reached as
+ *    many times during in a backend process during execution.
+ *
+ *    end_occurrence: the fault will stop triggering after it has been
+ *    triggered as many times.
+ *
+ *    extra_arg: used to specify the number of seconds to sleep when injecting
+ *    a "sleep" type of fault.
+ */
+static void
+exec_fault_injector_command(const char *query_string)
+{
+	char name[NAMEDATALEN];
+	char type[NAMEDATALEN];
+	char db[NAMEDATALEN];
+	char table[NAMEDATALEN];
+	int start;
+	int end;
+	int extra;
+	char *result;
+	int len;
+
+	if (sscanf(query_string, "faultname=%s type=%s db=%s table=%s "
+			   "start=%d end=%d extra=%d",
+			   name, type, db, table, &start, &end, &extra) != 7)
+		elog(ERROR, "invalid fault message: %s", query_string);
+	/* The value '#' means not specified. */
+	if (db[0] == '#')
+		db[0] = '\0';
+	if (table[0] == '#')
+		table[0] = '\0';
+
+	result = InjectFault(name, type, db, table, start, end, extra);
+	len = strlen(result);
+
+	StringInfoData buf;
+	pq_beginmessage(&buf, 'T');
+	pq_sendint(&buf, Natts_fault_message_response, 2);
+
+	pq_sendstring(&buf, "status");
+	pq_sendint(&buf, 0, 4);		/* table oid */
+	pq_sendint(&buf, Anum_fault_message_response_status, 2);		/* attnum */
+	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
+	pq_sendint(&buf, -1, 2);	/* typlen */
+	pq_sendint(&buf, -1, 4);		/* typmod */
+	pq_sendint(&buf, 0, 2);		/* format code */
+	pq_endmessage(&buf);
+
+	/* Send a DataRow message */
+	pq_beginmessage(&buf, 'D');
+	pq_sendint(&buf, Natts_fault_message_response, 2);		/* # of columns */
+
+	pq_sendint(&buf, len, 4);
+	pq_sendbytes(&buf, result, len);
+	pq_endmessage(&buf);
+	EndCommand("fault", DestRemote);
+	pq_flush();
+}
+#endif
 
 /*
  * exec_simple_query
@@ -4252,6 +4334,10 @@ PostgresMain(int argc, char *argv[],
 						if (!exec_replication_command(query_string))
 							exec_simple_query(query_string);
 					}
+#ifdef FAULT_INJECTOR
+					else if (am_faultinjector)
+						exec_fault_injector_command(query_string);
+#endif
 					else
 						exec_simple_query(query_string);
 
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 29c5ec7b58..8e040ae812 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -54,6 +54,7 @@
 #include "storage/sync.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
+#include "utils/faultinjector.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -842,7 +843,11 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 	 * backend startup by processing any options from the startup packet, and
 	 * we're done.
 	 */
-	if (am_walsender && !am_db_walsender)
+	if ((am_walsender && !am_db_walsender)
+#ifdef FAULT_INJECTOR
+		|| am_faultinjector
+#endif
+		)
 	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
diff --git a/src/backend/utils/misc/Makefile b/src/backend/utils/misc/Makefile
index ec7ec131e5..6eced97e3b 100644
--- a/src/backend/utils/misc/Makefile
+++ b/src/backend/utils/misc/Makefile
@@ -16,7 +16,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
 
 OBJS = guc.o help_config.o pg_config.o pg_controldata.o pg_rusage.o \
        ps_status.o queryenvironment.o rls.o sampling.o superuser.o \
-       timeout.o tzparser.o
+       timeout.o tzparser.o faultinjector.o
 
 # This location might depend on the installation directories. Therefore
 # we can't substitute it into pg_config.h.
diff --git a/src/backend/utils/misc/faultinjector.c b/src/backend/utils/misc/faultinjector.c
new file mode 100644
index 0000000000..2d215f4d92
--- /dev/null
+++ b/src/backend/utils/misc/faultinjector.c
@@ -0,0 +1,903 @@
+/*-------------------------------------------------------------------------
+ *
+ * faultinjector.c
+ *
+ * Fault injectors are used for fine control during testing. They allow a
+ * developer to create deterministic tests for scenarios that are hard to
+ * reproduce. This is done by programming actions at certain key areas to
+ * suspend, skip, or even panic the process. Fault injectors are set in shared
+ * memory so they are accessible to all segment processes.
+ *
+ * IDENTIFICATION
+ *		src/backend/utils/misc/faultinjector.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <signal.h>
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h>
+#endif
+#include "access/xact.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "postmaster/bgwriter.h"
+#include "storage/spin.h"
+#include "storage/shmem.h"
+#include "utils/faultinjector.h"
+#include "utils/hsearch.h"
+#include "miscadmin.h"
+
+#ifdef FAULT_INJECTOR
+
+/*
+ * gettext() can't be used in a static initializer... This breaks nls builds.
+ * So, to work around this issue, I've made _() be a no-op.
+ */
+#undef _
+#define _(x) x
+
+typedef struct FaultInjectorShmem_s
+{
+	slock_t		lock;
+	int			faultInjectorSlots;
+	HTAB	   *hash;
+}			FaultInjectorShmem_s;
+
+bool		am_faultinjector = false;
+
+static FaultInjectorShmem_s * faultInjectorShmem = NULL;
+
+static void FiLockAcquire(void);
+static void FiLockRelease(void);
+
+static FaultInjectorEntry_s * FaultInjector_LookupHashEntry(const char *faultName);
+
+static FaultInjectorEntry_s * FaultInjector_InsertHashEntry(const char *faultName,
+															bool *exists);
+
+static int	FaultInjector_NewHashEntry(FaultInjectorEntry_s * entry);
+
+static int	FaultInjector_MarkEntryAsResume(FaultInjectorEntry_s * entry);
+
+static bool FaultInjector_RemoveHashEntry(const char *faultName);
+
+static int	FaultInjector_SetFaultInjection(FaultInjectorEntry_s * entry);
+
+static FaultInjectorType_e FaultInjectorTypeStringToEnum(const char *faultType);
+
+/* Arrays to map between enum values and strings */
+const char *FaultInjectorTypeEnumToString[] = {
+#define FI_TYPE(id, str) str,
+#include "utils/faultinjector_lists.h"
+#undef FI_TYPE
+};
+
+const char *FaultInjectorStateEnumToString[] = {
+#define FI_STATE(id, str) str,
+#include "utils/faultinjector_lists.h"
+#undef FI_STATE
+};
+
+static FaultInjectorType_e
+FaultInjectorTypeStringToEnum(const char *faultTypeString)
+{
+	FaultInjectorType_e faultTypeEnum = FaultInjectorTypeMax;
+	int			ii;
+
+	for (ii = FaultInjectorTypeNotSpecified + 1; ii < FaultInjectorTypeMax; ii++)
+	{
+		if (strcmp(FaultInjectorTypeEnumToString[ii], faultTypeString) == 0)
+		{
+			faultTypeEnum = ii;
+			break;
+		}
+	}
+	return faultTypeEnum;
+}
+
+static void
+FiLockAcquire(void)
+{
+	SpinLockAcquire(&faultInjectorShmem->lock);
+}
+
+static void
+FiLockRelease(void)
+{
+	SpinLockRelease(&faultInjectorShmem->lock);
+}
+
+/****************************************************************
+ * FAULT INJECTOR routines
+ ****************************************************************/
+Size
+FaultInjector_ShmemSize(void)
+{
+	Size		size;
+
+	size = hash_estimate_size(
+							  (Size) FAULTINJECTOR_MAX_SLOTS,
+							  sizeof(FaultInjectorEntry_s));
+
+	size = add_size(size, sizeof(FaultInjectorShmem_s));
+
+	return size;
+}
+
+/*
+ * Hash table contains fault injection that are set on the system waiting to be injected.
+ * FaultInjector identifier is the key in the hash table.
+ * Hash table in shared memory is initialized only on primary and mirror segment.
+ * It is not initialized on master host.
+ */
+void
+FaultInjector_ShmemInit(void)
+{
+	HASHCTL		hash_ctl;
+	bool		foundPtr;
+
+	faultInjectorShmem = (FaultInjectorShmem_s *) ShmemInitStruct("fault injector",
+																  sizeof(FaultInjectorShmem_s),
+																  &foundPtr);
+
+	if (faultInjectorShmem == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 (errmsg("not enough shared memory for fault injector"))));
+
+	if (!foundPtr)
+		MemSet(faultInjectorShmem, 0, sizeof(FaultInjectorShmem_s));
+
+	SpinLockInit(&faultInjectorShmem->lock);
+
+	faultInjectorShmem->faultInjectorSlots = 0;
+
+	MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = FAULT_NAME_MAX_LENGTH;
+	hash_ctl.entrysize = sizeof(FaultInjectorEntry_s);
+	hash_ctl.hash = string_hash;
+
+	faultInjectorShmem->hash = ShmemInitHash("fault injector hash",
+											 FAULTINJECTOR_MAX_SLOTS,
+											 FAULTINJECTOR_MAX_SLOTS,
+											 &hash_ctl,
+											 HASH_ELEM | HASH_FUNCTION);
+
+	if (faultInjectorShmem->hash == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 (errmsg("not enough shared memory for fault injector"))));
+
+	elog(LOG, "initialized faultinjector shmem");
+	return;
+}
+
+FaultInjectorType_e
+FaultInjector_TriggerFaultIfSet(const char *faultName,
+								const char *databaseName,
+								const char *tableName)
+{
+
+	FaultInjectorEntry_s *entryShared,
+				localEntry,
+			   *entryLocal = &localEntry;
+	char		databaseNameLocal[NAMEDATALEN];
+	char		tableNameLocal[NAMEDATALEN];
+	int			ii = 0;
+	int			cnt = 3600;
+
+	if (strlen(faultName) >= FAULT_NAME_MAX_LENGTH)
+		elog(ERROR, "fault name too long: '%s'", faultName);
+	if (strcmp(faultName, FaultInjectorNameAll) == 0)
+		elog(ERROR, "invalid fault name '%s'", faultName);
+	if (strlen(databaseName) >= NAMEDATALEN)
+		elog(ERROR, "database name too long:'%s'", databaseName);
+	if (strlen(tableName) >= NAMEDATALEN)
+		elog(ERROR, "table name too long: '%s'", tableName);
+
+	/*
+	 * Return immediately if no fault has been injected ever.  It is important
+	 * to not touch the spinlock, especially if this is the postmaster
+	 * process.  If one of the backend processes dies while holding the spin
+	 * lock, and postmaster comes here before resetting the shared memory, it
+	 * waits without holder process and eventually goes into PANIC.  Also this
+	 * saves a few cycles to acquire the spin lock and look into the shared
+	 * hash table.
+	 *
+	 * Although this is a race condition without lock, a false negative is ok
+	 * given this framework is purely for dev/testing.
+	 */
+	if (faultInjectorShmem->faultInjectorSlots == 0)
+		return FaultInjectorTypeNotSpecified;
+
+	snprintf(databaseNameLocal, sizeof(databaseNameLocal), "%s", databaseName);
+	snprintf(tableNameLocal, sizeof(tableNameLocal), "%s", tableName);
+
+	entryLocal->faultInjectorType = FaultInjectorTypeNotSpecified;
+
+	FiLockAcquire();
+
+	entryShared = FaultInjector_LookupHashEntry(faultName);
+
+	do
+	{
+		if (entryShared == NULL)
+			/* fault injection is not set */
+			break;
+
+		if (strcmp(entryShared->databaseName, databaseNameLocal) != 0)
+			/* fault injection is not set for the specified database name */
+			break;
+
+		if (strcmp(entryShared->tableName, tableNameLocal) != 0)
+			/* fault injection is not set for the specified table name */
+			break;
+
+		if (entryShared->faultInjectorState == FaultInjectorStateCompleted ||
+			entryShared->faultInjectorState == FaultInjectorStateFailed)
+		{
+			/* fault injection was already executed */
+			break;
+		}
+
+		entryShared->numTimesTriggered++;
+
+		if (entryShared->numTimesTriggered < entryShared->startOccurrence)
+		{
+			break;
+		}
+
+		/* Update the injection fault entry in hash table */
+		entryShared->faultInjectorState = FaultInjectorStateTriggered;
+
+		/* Mark fault injector to completed */
+		if (entryShared->endOccurrence != INFINITE_END_OCCURRENCE &&
+			entryShared->numTimesTriggered >= entryShared->endOccurrence)
+			entryShared->faultInjectorState = FaultInjectorStateCompleted;
+
+		memcpy(entryLocal, entryShared, sizeof(FaultInjectorEntry_s));
+	} while (0);
+
+	FiLockRelease();
+
+	/* Inject fault */
+	switch (entryLocal->faultInjectorType)
+	{
+		case FaultInjectorTypeNotSpecified:
+			break;
+
+		case FaultInjectorTypeSleep:
+			/* Sleep for the specified amount of time. */
+			ereport(LOG,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			pg_usleep(entryLocal->extraArg * 1000000L);
+			break;
+
+		case FaultInjectorTypeFatal:
+			ereport(FATAL,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			break;
+
+		case FaultInjectorTypePanic:
+
+			/*
+			 * Avoid core file generation for this PANIC. It helps to avoid
+			 * filling up disks during tests and also saves time.
+			 */
+#if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE)
+			;
+			struct rlimit lim;
+
+			getrlimit(RLIMIT_CORE, &lim);
+			lim.rlim_cur = 0;
+			if (setrlimit(RLIMIT_CORE, &lim) != 0)
+				elog(NOTICE,
+					 "setrlimit failed for RLIMIT_CORE soft limit to zero (%m)");
+#endif
+			ereport(PANIC,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			break;
+
+		case FaultInjectorTypeError:
+			ereport(ERROR,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			break;
+
+		case FaultInjectorTypeInfiniteLoop:
+			/* Loop until the fault is reset or an interrupt occurs. */
+			ereport(LOG,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+
+			for (ii = 0;
+				 ii < cnt && FaultInjector_LookupHashEntry(entryLocal->faultName);
+				 ii++)
+			{
+				pg_usleep(1000000L);
+				CHECK_FOR_INTERRUPTS();
+			}
+			break;
+
+		case FaultInjectorTypeSuspend:
+			{
+				/* Suspend until the fault is resumed or reset */
+				FaultInjectorEntry_s *entry;
+
+				ereport(LOG,
+						(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+								entryLocal->faultName,
+								FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+
+				while ((entry = FaultInjector_LookupHashEntry(entryLocal->faultName)) != NULL &&
+					   entry->faultInjectorType != FaultInjectorTypeResume)
+				{
+					pg_usleep(1000000L);
+					/* 1 sec */
+				}
+
+				if (entry != NULL)
+				{
+					ereport(LOG,
+							(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+									entryLocal->faultName,
+									FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+				}
+				else
+				{
+					ereport(LOG,
+							(errmsg("fault name:'%s' removed", entryLocal->faultName)));
+
+					/*
+					 * Since the entry is gone already, we should NOT update
+					 * the entry below.  (There could be other places in this
+					 * function that are under the same situation, but I'm too
+					 * tired to look for them...)
+					 */
+					return entryLocal->faultInjectorType;
+				}
+				break;
+			}
+
+		case FaultInjectorTypeSkip:
+			/* Do nothing.  The caller is expected to take some action. */
+			ereport(LOG,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			break;
+
+		case FaultInjectorTypeResume:
+
+			/*
+			 * This fault is resumed after suspension but has not been reset
+			 * yet.  Ignore.
+			 */
+			break;
+
+		case FaultInjectorTypeSegv:
+			{
+				/*
+				 * Avoid core file generation for this PANIC. It helps to
+				 * avoid filling up disks during tests and also saves time.
+				 */
+#if defined(HAVE_GETRLIMIT) && defined(RLIMIT_CORE)
+				struct rlimit lim;
+
+				getrlimit(RLIMIT_CORE, &lim);
+				lim.rlim_cur = 0;
+				if (setrlimit(RLIMIT_CORE, &lim) != 0)
+					elog(NOTICE,
+						 "setrlimit failed for RLIMIT_CORE soft limit to zero (%m)");
+#endif
+
+				*(volatile int *) 0 = 1234;
+				break;
+			}
+
+		case FaultInjectorTypeInterrupt:
+
+			/*
+			 * XXX: check if the following comment is valid.
+			 *
+			 * The place where this type of fault is injected must have has
+			 * HOLD_INTERRUPTS() .. RESUME_INTERRUPTS() around it, otherwise
+			 * the interrupt could be handled inside the fault injector itself
+			 */
+			ereport(LOG,
+					(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+							entryLocal->faultName,
+							FaultInjectorTypeEnumToString[entryLocal->faultInjectorType])));
+			InterruptPending = true;
+			QueryCancelPending = true;
+			break;
+
+		default:
+			ereport(ERROR,
+					(errmsg("invalid fault type %d, fault name:'%s'",
+							entryLocal->faultInjectorType, entryLocal->faultName)));
+			break;
+	}
+	return (entryLocal->faultInjectorType);
+}
+
+/*
+ * lookup if fault injection is set
+ */
+static FaultInjectorEntry_s *
+FaultInjector_LookupHashEntry(const char *faultName)
+{
+	FaultInjectorEntry_s *entry;
+
+	Assert(faultInjectorShmem->hash != NULL);
+	entry = (FaultInjectorEntry_s *) hash_search(
+												 faultInjectorShmem->hash,
+												 (void *) faultName, //key
+												 HASH_FIND,
+												 NULL);
+
+	if (entry == NULL)
+	{
+		ereport(DEBUG5,
+				(errmsg("FaultInjector_LookupHashEntry() could not find fault injection hash entry:'%s' ",
+						faultName)));
+	}
+
+	return entry;
+}
+
+/*
+ * insert fault injection in hash table
+ */
+static FaultInjectorEntry_s *
+FaultInjector_InsertHashEntry(const char *faultName,
+							  bool *exists)
+{
+
+	bool		foundPtr;
+	FaultInjectorEntry_s *entry;
+
+	Assert(faultInjectorShmem->hash != NULL);
+	entry = (FaultInjectorEntry_s *) hash_search(
+												 faultInjectorShmem->hash,
+												 (void *) faultName, //key
+												 HASH_ENTER_NULL,
+												 &foundPtr);
+
+	if (entry == NULL)
+	{
+		*exists = false;
+		return entry;
+	}
+
+	elog(DEBUG1, "FaultInjector_InsertHashEntry() entry_key:%s",
+		 entry->faultName);
+
+	if (foundPtr)
+	{
+		*exists = true;
+	}
+	else
+	{
+		*exists = false;
+	}
+
+	return entry;
+}
+
+static bool
+FaultInjector_RemoveHashEntry(const char *faultName)
+{
+
+	FaultInjectorEntry_s *entry;
+	bool		isRemoved = false;
+
+	Assert(faultInjectorShmem->hash != NULL);
+	entry = (FaultInjectorEntry_s *) hash_search(
+												 faultInjectorShmem->hash,
+												 (void *) faultName, //key
+												 HASH_REMOVE,
+												 NULL);
+
+	if (entry)
+	{
+		ereport(LOG,
+				(errmsg("fault removed, fault name:'%s' fault type:'%s' ",
+						entry->faultName,
+						FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+
+		isRemoved = true;
+	}
+
+	return isRemoved;
+}
+
+static int
+FaultInjector_NewHashEntry(FaultInjectorEntry_s * entry)
+{
+
+	FaultInjectorEntry_s *entryLocal = NULL;
+	bool		exists;
+	int			status = STATUS_OK;
+
+	FiLockAcquire();
+
+	if ((faultInjectorShmem->faultInjectorSlots + 1) >= FAULTINJECTOR_MAX_SLOTS)
+	{
+		FiLockRelease();
+		status = STATUS_ERROR;
+		ereport(WARNING,
+				(errmsg("cannot insert fault injection, no slots available"),
+				 errdetail("Fault name:'%s' fault type:'%s'",
+						   entry->faultName,
+						   FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+		snprintf(entry->bufOutput, sizeof(entry->bufOutput),
+				 "could not insert fault injection, max slots:'%d' reached",
+				 FAULTINJECTOR_MAX_SLOTS);
+
+		goto exit;
+	}
+
+	entryLocal = FaultInjector_InsertHashEntry(entry->faultName, &exists);
+
+	if (entryLocal == NULL)
+	{
+		FiLockRelease();
+		status = STATUS_ERROR;
+		ereport(WARNING,
+				(errmsg("cannot insert fault injection entry into table, no memory"),
+				 errdetail("Fault name:'%s' fault type:'%s'",
+						   entry->faultName,
+						   FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+		snprintf(entry->bufOutput, sizeof(entry->bufOutput),
+				 "could not insert fault injection, no memory");
+
+		goto exit;
+	}
+
+	if (exists)
+	{
+		FiLockRelease();
+		status = STATUS_ERROR;
+		ereport(WARNING,
+				(errmsg("cannot insert fault injection entry into table, entry already exists"),
+				 errdetail("Fault name:'%s' fault type:'%s' ",
+						   entry->faultName,
+						   FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+		snprintf(entry->bufOutput, sizeof(entry->bufOutput),
+				 "could not insert fault injection, entry already exists");
+
+		goto exit;
+	}
+
+	entryLocal->faultInjectorType = entry->faultInjectorType;
+	strlcpy(entryLocal->faultName, entry->faultName, sizeof(entryLocal->faultName));
+
+	entryLocal->extraArg = entry->extraArg;
+
+	entryLocal->startOccurrence = entry->startOccurrence;
+	entryLocal->endOccurrence = entry->endOccurrence;
+
+	entryLocal->numTimesTriggered = 0;
+	strcpy(entryLocal->databaseName, entry->databaseName);
+	strcpy(entryLocal->tableName, entry->tableName);
+
+	entryLocal->faultInjectorState = FaultInjectorStateWaiting;
+
+	faultInjectorShmem->faultInjectorSlots++;
+
+	FiLockRelease();
+
+	elog(DEBUG1, "FaultInjector_NewHashEntry(): '%s'", entry->faultName);
+
+exit:
+
+	return status;
+}
+
+/*
+ * update hash entry with state
+ */
+static int
+FaultInjector_MarkEntryAsResume(FaultInjectorEntry_s * entry)
+{
+
+	FaultInjectorEntry_s *entryLocal;
+	int			status = STATUS_OK;
+
+	Assert(entry->faultInjectorType == FaultInjectorTypeResume);
+
+	FiLockAcquire();
+
+	entryLocal = FaultInjector_LookupHashEntry(entry->faultName);
+
+	if (entryLocal == NULL)
+	{
+		FiLockRelease();
+		status = STATUS_ERROR;
+		ereport(WARNING,
+				(errmsg("cannot update fault injection hash entry with fault injection status, no entry found"),
+				 errdetail("Fault name:'%s' fault type:'%s'",
+						   entry->faultName,
+						   FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+		goto exit;
+	}
+
+	if (entryLocal->faultInjectorType != FaultInjectorTypeSuspend)
+		ereport(ERROR, (errmsg("only suspend fault can be resumed")));
+
+	entryLocal->faultInjectorType = FaultInjectorTypeResume;
+
+	FiLockRelease();
+
+	ereport(DEBUG1,
+			(errmsg("LOG(fault injector): update fault injection hash entry identifier:'%s' state:'%s'",
+					entry->faultName,
+					FaultInjectorStateEnumToString[entryLocal->faultInjectorState])));
+
+exit:
+
+	return status;
+}
+
+/*
+ * Inject fault according to its type.
+ */
+static int
+FaultInjector_SetFaultInjection(FaultInjectorEntry_s * entry)
+{
+	int			status = STATUS_OK;
+	bool		isRemoved = false;
+
+	switch (entry->faultInjectorType)
+	{
+		case FaultInjectorTypeReset:
+			{
+				HASH_SEQ_STATUS hash_status;
+				FaultInjectorEntry_s *entryLocal;
+
+				if (strcmp(entry->faultName, FaultInjectorNameAll) == 0)
+				{
+					hash_seq_init(&hash_status, faultInjectorShmem->hash);
+
+					FiLockAcquire();
+
+					while ((entryLocal = (FaultInjectorEntry_s *) hash_seq_search(&hash_status)) != NULL)
+					{
+						isRemoved = FaultInjector_RemoveHashEntry(entryLocal->faultName);
+						if (isRemoved == true)
+						{
+							faultInjectorShmem->faultInjectorSlots--;
+						}
+					}
+					FiLockRelease();
+					Assert(faultInjectorShmem->faultInjectorSlots == 0);
+				}
+				else
+				{
+					FiLockAcquire();
+					isRemoved = FaultInjector_RemoveHashEntry(entry->faultName);
+					if (isRemoved == true)
+					{
+						faultInjectorShmem->faultInjectorSlots--;
+					}
+					FiLockRelease();
+				}
+
+				if (isRemoved == false)
+					ereport(DEBUG1,
+							(errmsg("LOG(fault injector): could not remove fault injection from hash identifier:'%s'",
+									entry->faultName)));
+
+				break;
+			}
+
+		case FaultInjectorTypeWaitUntilTriggered:
+			{
+				FaultInjectorEntry_s *entryLocal;
+				int			retry_count = 600;	/* 10 minutes */
+
+				while ((entryLocal = FaultInjector_LookupHashEntry(entry->faultName)) != NULL &&
+					   entryLocal->faultInjectorState != FaultInjectorStateCompleted &&
+					   entryLocal->numTimesTriggered - entryLocal->startOccurrence < entry->extraArg - 1)
+				{
+					pg_usleep(1000000L);
+					/* 1 sec */
+					retry_count--;
+					if (!retry_count)
+					{
+						ereport(ERROR,
+								(errmsg("fault not triggered, fault name:'%s' fault type:'%s' ",
+										entryLocal->faultName,
+										FaultInjectorTypeEnumToString[entry->faultInjectorType]),
+								 errdetail("Timed-out as 10 minutes max wait happens until triggered.")));
+					}
+				}
+
+				if (entryLocal != NULL)
+				{
+					ereport(LOG,
+							(errmsg("fault triggered %d times, fault name:'%s' fault type:'%s' ",
+									entryLocal->numTimesTriggered,
+									entryLocal->faultName,
+									FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+					status = STATUS_OK;
+				}
+				else
+				{
+					ereport(ERROR,
+							(errmsg("fault not set, fault name:'%s'  ",
+									entryLocal->faultName)));
+				}
+				break;
+			}
+
+		case FaultInjectorTypeStatus:
+			{
+				FaultInjectorEntry_s *entryLocal;
+				int			length;
+
+				if (faultInjectorShmem->hash == NULL)
+				{
+					status = STATUS_ERROR;
+					break;
+				}
+				length = snprintf(entry->bufOutput, sizeof(entry->bufOutput), "Success: ");
+
+
+				entryLocal = FaultInjector_LookupHashEntry(entry->faultName);
+				if (entryLocal)
+				{
+					length = snprintf(
+									  (entry->bufOutput + length),
+									  sizeof(entry->bufOutput) - length,
+									  "fault name:'%s' "
+									  "fault type:'%s' "
+									  "database name:'%s' "
+									  "table name:'%s' "
+									  "start occurrence:'%d' "
+									  "end occurrence:'%d' "
+									  "extra arg:'%d' "
+									  "fault injection state:'%s' "
+									  "num times hit:'%d' \n",
+									  entryLocal->faultName,
+									  FaultInjectorTypeEnumToString[entryLocal->faultInjectorType],
+									  entryLocal->databaseName,
+									  entryLocal->tableName,
+									  entryLocal->startOccurrence,
+									  entryLocal->endOccurrence,
+									  entryLocal->extraArg,
+									  FaultInjectorStateEnumToString[entryLocal->faultInjectorState],
+									  entryLocal->numTimesTriggered);
+				}
+				else
+				{
+					length = snprintf(entry->bufOutput, sizeof(entry->bufOutput),
+									  "Failure: fault name:'%s' not set",
+									  entry->faultName);
+
+				}
+				elog(LOG, "%s", entry->bufOutput);
+				if (length > sizeof(entry->bufOutput))
+					elog(LOG, "fault status truncated from %d to %lu characters",
+						 length, sizeof(entry->bufOutput));
+				break;
+			}
+		case FaultInjectorTypeResume:
+			{
+				ereport(LOG,
+						(errmsg("fault triggered, fault name:'%s' fault type:'%s' ",
+								entry->faultName,
+								FaultInjectorTypeEnumToString[entry->faultInjectorType])));
+
+				FaultInjector_MarkEntryAsResume(entry);
+
+				break;
+			}
+		default:
+
+			status = FaultInjector_NewHashEntry(entry);
+			break;
+	}
+	return status;
+}
+
+char *
+InjectFault(char *faultName, char *type, char *databaseName, char *tableName,
+			int startOccurrence, int endOccurrence, int extraArg)
+{
+	StringInfo	buf = makeStringInfo();
+	FaultInjectorEntry_s faultEntry;
+
+	elog(DEBUG1, "injecting fault: name %s, type %s, db %s, table %s, startOccurrence %d, endOccurrence %d, extraArg %d",
+		 faultName, type, databaseName, tableName,
+		 startOccurrence, endOccurrence, extraArg);
+
+	if (strlcpy(faultEntry.faultName, faultName, sizeof(faultEntry.faultName)) >=
+		sizeof(faultEntry.faultName))
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("fault name too long: '%s'", faultName),
+				 errdetail("Fault name should be no more than %d characters.",
+						   FAULT_NAME_MAX_LENGTH - 1)));
+
+	faultEntry.faultInjectorType = FaultInjectorTypeStringToEnum(type);
+	if (faultEntry.faultInjectorType == FaultInjectorTypeMax)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("could not recognize fault type '%s'", type)));
+
+	/* Special fault name "all" is only used to reset all faults */
+	if (faultEntry.faultInjectorType != FaultInjectorTypeReset &&
+		strcmp(faultEntry.faultName, FaultInjectorNameAll) == 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("invalid fault name '%s'", faultName)));
+
+	faultEntry.extraArg = extraArg;
+	if (faultEntry.faultInjectorType == FaultInjectorTypeSleep)
+	{
+		if (extraArg < 0 || extraArg > 7200)
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid sleep time, allowed range [0, 7200 sec]")));
+	}
+
+	if (strlcpy(faultEntry.databaseName, databaseName,
+				sizeof(faultEntry.databaseName)) >=
+		sizeof(faultEntry.databaseName))
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("database name too long: '%s'", databaseName),
+				 errdetail("Database name should be no more than %d characters.",
+						   NAMEDATALEN - 1)));
+
+	if (strlcpy(faultEntry.tableName, tableName, sizeof(faultEntry.tableName)) >=
+		sizeof(faultEntry.tableName))
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("table name too long: '%s'", tableName),
+				 errdetail("Table name should be no more than %d characters.",
+						   NAMEDATALEN - 1)));
+
+	if (startOccurrence < 1 || startOccurrence > 1000)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("invalid start occurrence number, allowed range [1, 1000]")));
+
+
+	if (endOccurrence != INFINITE_END_OCCURRENCE && endOccurrence < startOccurrence)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("invalid end occurrence number, allowed range [startOccurrence, ] or -1")));
+
+	faultEntry.startOccurrence = startOccurrence;
+	faultEntry.endOccurrence = endOccurrence;
+
+	if (FaultInjector_SetFaultInjection(&faultEntry) == STATUS_OK)
+	{
+		if (faultEntry.faultInjectorType == FaultInjectorTypeStatus)
+			appendStringInfo(buf, "%s", faultEntry.bufOutput);
+		else
+		{
+			appendStringInfo(buf, "Success:");
+			elog(LOG, "injected fault '%s' type '%s'", faultName, type);
+		}
+	}
+	else
+		appendStringInfo(buf, "Failure: %s", faultEntry.bufOutput);
+
+	return buf->data;
+}
+#endif
diff --git a/src/include/utils/faultinjector.h b/src/include/utils/faultinjector.h
new file mode 100644
index 0000000000..b69e65bf58
--- /dev/null
+++ b/src/include/utils/faultinjector.h
@@ -0,0 +1,96 @@
+/*-------------------------------------------------------------------------
+ *
+ * faultinjector.h
+ *	  Definitions for fault based testing framework.
+ *
+ * src/include/utils/faultinjector.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef FAULTINJECTOR_H
+#define FAULTINJECTOR_H
+
+#include "pg_config_manual.h"
+
+#define FAULTINJECTOR_MAX_SLOTS	16
+
+#define FAULT_NAME_MAX_LENGTH	256
+
+#define INFINITE_END_OCCURRENCE -1
+
+#define Natts_fault_message_response 1
+#define Anum_fault_message_response_status 0
+
+/* Fault name that matches all faults */
+#define FaultInjectorNameAll "all"
+
+typedef enum FaultInjectorType_e {
+#define FI_TYPE(id, str) id,
+#include "utils/faultinjector_lists.h"
+#undef FI_TYPE
+	FaultInjectorTypeMax
+} FaultInjectorType_e;
+
+/*
+ *
+ */
+typedef enum FaultInjectorState_e {
+#define FI_STATE(id, str) id,
+#include "utils/faultinjector_lists.h"
+#undef FI_STATE
+	FaultInjectorStateMax
+} FaultInjectorState_e;
+
+
+/*
+ *
+ */
+typedef struct FaultInjectorEntry_s {
+	
+	char						faultName[FAULT_NAME_MAX_LENGTH];
+
+	FaultInjectorType_e		faultInjectorType;
+	
+	int						extraArg;
+		/* in seconds, in use if fault injection type is sleep */
+		
+	char					databaseName[NAMEDATALEN];
+	
+	char					tableName[NAMEDATALEN];
+	
+	volatile	int			startOccurrence;
+	volatile	int			endOccurrence;
+	volatile	 int	numTimesTriggered;
+	volatile	FaultInjectorState_e	faultInjectorState;
+
+		/* the state of the fault injection */
+	char					bufOutput[2500];
+	
+} FaultInjectorEntry_s;
+
+
+extern Size FaultInjector_ShmemSize(void);
+
+extern void FaultInjector_ShmemInit(void);
+
+extern FaultInjectorType_e FaultInjector_TriggerFaultIfSet(
+							   const char*				 faultName,
+							   const char*				 databaseName,
+							   const char*				 tableName);
+
+extern char *InjectFault(
+	char *faultName, char *type, char *databaseName, char *tableName,
+	int startOccurrence, int endOccurrence, int extraArg);
+
+#ifdef FAULT_INJECTOR
+extern bool am_faultinjector;
+#define IsFaultHandler am_faulthandler
+#define SIMPLE_FAULT_INJECTOR(FaultName) \
+	FaultInjector_TriggerFaultIfSet(FaultName, "", "")
+#else
+#define IsFaultHandler false
+#define SIMPLE_FAULT_INJECTOR(FaultName)
+#endif
+
+#endif	/* FAULTINJECTOR_H */
diff --git a/src/include/utils/faultinjector_lists.h b/src/include/utils/faultinjector_lists.h
new file mode 100644
index 0000000000..943c674a20
--- /dev/null
+++ b/src/include/utils/faultinjector_lists.h
@@ -0,0 +1,71 @@
+/*
+ * faultinjector_lists.h
+ *
+ * List of fault injector types, states and some other things. These are
+ * listed using C preprocessor macros. To use, you must define the appropriate
+ * FI_* macros before #including this file.
+ *
+ * For example, to get an array of all the type strings, do:
+ *
+ * const char *FaultInjectorTypeStrings[] = {
+ * #define FI_TYPE(id, str) str,
+ * #include "utils/faultinjector_lists.h"
+ * #undef FI_TYPE
+ * };
+ *
+ *
+ * To add a new entry, simple add a new FI_* line to the appropriate list
+ * below.
+ *
+ *
+ */
+
+/* there is deliberately not an #ifndef FAULTINJECTOR_LISTS_H here */
+
+
+/*
+ * Fault types. These indicate the action to do when the fault injection
+ * point is reached.
+ */
+#ifdef FI_TYPE
+FI_TYPE(FaultInjectorTypeNotSpecified = 0, "")
+FI_TYPE(FaultInjectorTypeSleep, "sleep")
+FI_TYPE(FaultInjectorTypeFatal, "fatal")
+FI_TYPE(FaultInjectorTypePanic, "panic")
+FI_TYPE(FaultInjectorTypeError, "error")
+FI_TYPE(FaultInjectorTypeInfiniteLoop, "infinite_loop")
+FI_TYPE(FaultInjectorTypeSuspend, "suspend")
+FI_TYPE(FaultInjectorTypeResume, "resume")
+FI_TYPE(FaultInjectorTypeSkip, "skip")
+FI_TYPE(FaultInjectorTypeReset, "reset")
+FI_TYPE(FaultInjectorTypeStatus, "status")
+FI_TYPE(FaultInjectorTypeSegv, "segv")
+FI_TYPE(FaultInjectorTypeInterrupt, "interrupt")
+FI_TYPE(FaultInjectorTypeWaitUntilTriggered, "wait_until_triggered")
+#endif
+
+/*
+ * States of a fault.
+ */
+#ifdef FI_STATE
+FI_STATE(FaultInjectorStateNotInitialized = 0, "not initialized")
+
+/* The fault has been injected (enabled using the SQL interface). */
+FI_STATE(FaultInjectorStateWaiting, "set")
+
+/*
+ * A backend process reached the fault point that was set and the
+ * corresponding action has been taken.
+ */
+FI_STATE(FaultInjectorStateTriggered, "triggered")
+
+/*
+ * The fault has been triggered as many times as was configured by the
+ * SQL interface.  The action associated with it will no longer be taken
+ * if it is reached during execution.
+ */
+FI_STATE(FaultInjectorStateCompleted, "completed")
+
+/* Fault was NOT injected */
+FI_STATE(FaultInjectorStateFailed, "failed")
+#endif
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 4fe871406c..7934e291a8 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -339,6 +339,12 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+#if defined(FAULT_INJECTOR)
+	{"fault", NULL, NULL, NULL,
+		"Faultinjector", "D", 5,
+	offsetof(struct pg_conn, fault)},
+#endif
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index b04f7ec123..72a92deb7a 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2182,6 +2182,10 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("database", conn->dbName);
 	if (conn->replication && conn->replication[0])
 		ADD_STARTUP_OPTION("replication", conn->replication);
+#if defined(FAULT_INJECTOR)
+	if (conn->fault && conn->fault[0])
+		ADD_STARTUP_OPTION("fault", conn->fault);
+#endif
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
 	if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d37bb3ce40..9d7dbd13b3 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -344,6 +344,9 @@ struct pg_conn
 	char	   *fbappname;		/* fallback application name */
 	char	   *dbName;			/* database name */
 	char	   *replication;	/* connect as the replication standby? */
+#if defined(FAULT_INJECTOR)
+	char	   *fault;			/* connection to send a fault injector message */
+#endif
 	char	   *pguser;			/* Postgres username and password, if any */
 	char	   *pgpass;
 	char	   *pgpassfile;		/* path to a file containing password(s) */
-- 
2.14.3 (Apple Git-98)

