diff --git a/configure b/configure
index 83abe872aa..e7fffcea86 100755
--- a/configure
+++ b/configure
@@ -11037,6 +11037,84 @@ _ACEOF
 
 fi
 
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -lrt" >&5
+$as_echo_n "checking for main in -lrt... " >&6; }
+if ${ac_cv_lib_rt_main+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lrt  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+
+int
+main ()
+{
+return main ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_rt_main=yes
+else
+  ac_cv_lib_rt_main=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_rt_main" >&5
+$as_echo "$ac_cv_lib_rt_main" >&6; }
+if test "x$ac_cv_lib_rt_main" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBRT 1
+_ACEOF
+
+  LIBS="-lrt $LIBS"
+
+fi
+
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -laio" >&5
+$as_echo_n "checking for main in -laio... " >&6; }
+if ${ac_cv_lib_aio_main+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-laio  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+
+int
+main ()
+{
+return main ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_aio_main=yes
+else
+  ac_cv_lib_aio_main=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_aio_main" >&5
+$as_echo "$ac_cv_lib_aio_main" >&6; }
+if test "x$ac_cv_lib_aio_main" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBAIO 1
+_ACEOF
+
+  LIBS="-laio $LIBS"
+
+fi
+
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing setproctitle" >&5
 $as_echo_n "checking for library containing setproctitle... " >&6; }
 if ${ac_cv_search_setproctitle+:} false; then :
diff --git a/configure.in b/configure.in
index ecdf172396..1d2f9b9d4c 100644
--- a/configure.in
+++ b/configure.in
@@ -1139,6 +1139,8 @@ AC_SUBST(PTHREAD_LIBS)
 ##
 
 AC_CHECK_LIB(m, main)
+AC_CHECK_LIB(rt, main)
+AC_CHECK_LIB(aio, main)
 AC_SEARCH_LIBS(setproctitle, util)
 AC_SEARCH_LIBS(dlopen, dl)
 AC_SEARCH_LIBS(socket, [socket ws2_32])
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f9980cf80c..94a3258438 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -473,6 +473,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
 static void AtProcExit_Buffers(int code, Datum arg);
+static int PerformAIORequest(int start_buffer_id, int num_buffers);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
 static int	buffertag_comparator(const void *p1, const void *p2);
@@ -705,6 +706,135 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
 							 mode, strategy, &hit);
 }
 
+static int
+PerformAIORequest (
+  int start_buffer_id,
+  int num_buffers
+) {
+  int ii = 0;
+  int num_requests = 0;
+  int buf_id = start_buffer_id;
+  aio_req_t req[MAX_AIO_BATCH_SIZE];
+  BufferDesc *buf;
+  uint32 buf_state;
+
+  Assert(num_buffers <= MAX_AIO_BATCH_SIZE);
+  Assert((start_buffer_id + num_buffers) <= NBuffers);
+
+  memset(req, 0, (sizeof(req)));
+
+  for (ii = 0; ii < num_buffers; ++ii) {
+    XLogRecPtr recptr;
+
+    buf = GetBufferDescriptor(buf_id++);
+
+    buf_state = pg_atomic_read_u32(&buf->state);
+    if (!(buf_state & BM_VALID)
+      || !(buf_state & BM_DIRTY)
+      || (buf_state & BM_IO_IN_PROGRESS)) {
+      continue;
+    }
+
+    buf_state = LockBufHdr(buf);
+    if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) {
+      UnlockBufHdr(buf, buf_state);
+      continue;
+    }
+
+    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+    ReservePrivateRefCountEntry();
+    PinBuffer_Locked(buf);
+    if (!LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
+      LW_SHARED)) {
+      UnpinBuffer(buf, true);
+      continue;
+    }
+
+    buf_state = LockBufHdr(buf);
+    if (buf_state & BM_IO_IN_PROGRESS) {
+      LWLockRelease(BufferDescriptorGetContentLock(buf));
+      UnlockBufHdr(buf, buf_state);
+      UnpinBuffer(buf, true);
+      continue;
+    }
+
+    if (!LWLockConditionalAcquire(BufferDescriptorGetIOLock(buf),
+      LW_EXCLUSIVE)) {
+      LWLockRelease(BufferDescriptorGetContentLock(buf));
+      UnlockBufHdr(buf, buf_state);
+      UnpinBuffer(buf, true);
+      continue;
+    }
+
+    if (buf_state & BM_IO_IN_PROGRESS) {
+      LWLockRelease(BufferDescriptorGetIOLock(buf));
+      LWLockRelease(BufferDescriptorGetContentLock(buf));
+      UnlockBufHdr(buf, buf_state);
+      UnpinBuffer(buf, true);
+      continue;
+    }
+
+    if (!(buf_state & BM_DIRTY)) {
+      LWLockRelease(BufferDescriptorGetIOLock(buf));
+      LWLockRelease(BufferDescriptorGetContentLock(buf));
+      UnlockBufHdr(buf, buf_state);
+      UnpinBuffer(buf, true);
+      continue;
+    }
+
+    recptr = BufferGetLSN(buf);
+
+    buf_state &= ~BM_JUST_DIRTIED;
+    buf_state |= BM_IO_IN_PROGRESS;
+
+    UnlockBufHdr(buf, buf_state);
+
+    if (buf_state & BM_PERMANENT) {
+      XLogFlush(recptr);
+    }
+
+    req[ii].reln = smgropen(buf->tag.rnode, InvalidBackendId);
+    req[ii].bnum = buf->tag.blockNum;
+    req[ii].fnum = buf->tag.forkNum,
+    req[ii].buf = PageSetChecksumCopy((Page) BufHdrGetBlock(
+          buf), buf->tag.blockNum);
+    req[ii].isValid = true;
+
+    ++num_requests;
+  }
+
+  if (0 < num_requests) {
+    /* Tell the storage manager to do it's job */
+    if (!(smgraio(req))) {
+      elog(PANIC, "SMGRAIO error!\n");
+    }
+
+    pgBufferUsage.shared_blks_written += num_requests;
+
+    buf_id = start_buffer_id;
+    for (ii = 0; ii < num_buffers; ++ii) {
+      buf = GetBufferDescriptor(buf_id++);
+
+      if (!req[ii].isValid) {
+        continue;
+      }
+
+      /* Lock the buffer header while we play with it */
+      buf_state = LockBufHdr(buf);
+      buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
+      if (!(buf_state & BM_JUST_DIRTIED)) {
+        buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
+      }
+
+      LWLockRelease(BufferDescriptorGetIOLock(buf));
+      LWLockRelease(BufferDescriptorGetContentLock(buf));
+      UnlockBufHdr(buf, buf_state);
+      UnpinBuffer(buf, true);
+    }
+  }
+
+  return (num_requests);
+} /* PerformAIORequest() */
 
 /*
  * ReadBuffer_common -- common logic for all ReadBuffer variants
@@ -2293,29 +2423,53 @@ BgBufferSync(WritebackContext *wb_context)
 	reusable_buffers = reusable_buffers_est;
 
 	/* Execute the LRU scan */
-	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
-	{
-		int			sync_state = SyncOneBuffer(next_to_clean, true,
-											   wb_context);
-
-		if (++next_to_clean >= NBuffers)
-		{
-			next_to_clean = 0;
-			next_passes++;
+	if (EnableAsyncIO) {
+		/*
+		 * No strategy other than to keep it as clean as possible.
+		 */
+		saved_info_valid = false;
+		while (num_to_scan > 0) {
+			int request_size = Min(num_to_scan, max_asyncio_events);
+
+			/* XXX refactor to remove duplicate aio call */
+			if ((next_to_clean + request_size) >= NBuffers) {
+				request_size = (NBuffers - next_to_clean);
+				num_written += PerformAIORequest(next_to_clean, request_size);
+        reusable_buffers += num_written;
+				next_to_clean = 0;
+				++next_passes;
+			} else {
+				num_written += PerformAIORequest(next_to_clean, request_size);
+        reusable_buffers += num_written;
+				next_to_clean += request_size;
+			}
+			num_to_scan -= request_size;
 		}
-		num_to_scan--;
-
-		if (sync_state & BUF_WRITTEN)
+	} else {
+		while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 		{
-			reusable_buffers++;
-			if (++num_written >= bgwriter_lru_maxpages)
+			int			sync_state = SyncOneBuffer(next_to_clean, true,
+													 wb_context);
+
+			if (++next_to_clean >= NBuffers)
 			{
-				BgWriterStats.m_maxwritten_clean++;
-				break;
+				next_to_clean = 0;
+				next_passes++;
+			}
+			num_to_scan--;
+
+			if (sync_state & BUF_WRITTEN)
+			{
+				reusable_buffers++;
+				if (++num_written >= bgwriter_lru_maxpages)
+				{
+					BgWriterStats.m_maxwritten_clean++;
+					break;
+				}
 			}
+			else if (sync_state & BUF_REUSABLE)
+				reusable_buffers++;
 		}
-		else if (sync_state & BUF_REUSABLE)
-			reusable_buffers++;
 	}
 
 	BgWriterStats.m_buf_written_clean += num_written;
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 6ffd7b3306..21c4e6f355 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -434,7 +434,12 @@ InitLocalBuffers(void)
 
 	/* Allocate and zero buffer headers and auxiliary arrays */
 	LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
-	LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
+
+	/* Ensure alignment of local buffers */
+	LocalBufferBlockPointers =
+		(Block *) DIOBUFFERALIGN(malloc((sizeof(Block) * nbufs) + ALIGNOF_DIRECTIO));
+	MemSet((void *) LocalBufferBlockPointers, 0, (sizeof(Block) * nbufs));
+
 	LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
 	if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
 		ereport(FATAL,
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 7dc6dd2f15..032057bf6b 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -271,6 +271,11 @@ static Oid *tempTableSpaces = NULL;
 static int	numTempTableSpaces = -1;
 static int	nextTempTableSpace = 0;
 
+#ifdef USE_POSIX_AIO
+# include <aio.h>
+#else
+# include <libaio.h>
+#endif /* USE_POSIX_AIO */
 
 /*--------------------
  *
@@ -336,6 +341,11 @@ static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 static int	fsync_parent_path(const char *fname, int elevel);
 
 
+/* Asynchronous and Direct I/O GUCs */
+bool EnableDirectIO = false;
+bool EnableAsyncIO = false;
+int max_asyncio_events = MAX_AIO_BATCH_SIZE;
+
 /*
  * pg_fsync --- do fsync with or without writethrough
  */
@@ -1693,6 +1703,177 @@ OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
 	return file;
 }
 
+#ifdef USE_POSIX_AIO
+bool
+FileAIO (
+  aio_req_t *req
+) {
+  struct aiocb aio_array[MAX_AIO_BATCH_SIZE];
+  struct aiocb *aio_list[MAX_AIO_BATCH_SIZE];
+  int rc;
+  int ii = 0;
+  int aio_nent = 0;
+
+  memset(aio_array, 0, sizeof(aio_array));
+
+  for (ii = 0; ii < max_asyncio_events; ++ii) {
+    if (!req[ii].isValid) {
+      continue;
+    }
+
+    Assert(FileIsValid(req[ii].file));
+    rc = FileAccess(req[ii].file);
+    if (rc < 0) {
+      elog(ERROR, "FileAccess returned %d at line %d in %s",
+        rc, __LINE__, __func__);
+      continue;
+    }
+
+    aio_array[aio_nent].aio_lio_opcode = LIO_WRITE;
+    aio_array[aio_nent].aio_fildes = VfdCache[req[ii].file].fd;
+    aio_array[aio_nent].aio_buf = req[ii].buf;
+    aio_array[aio_nent].aio_nbytes = BLCKSZ;
+    aio_array[aio_nent].aio_offset = req[ii].offset;
+    aio_list[aio_nent] = &aio_array[aio_nent];
+
+    ++aio_nent;
+  }
+
+  rc = lio_listio(LIO_WAIT, aio_list, aio_nent, 0);
+  if (rc) {
+    if (EINTR == errno) {
+      int continue_loop;
+
+      do {
+        aio_suspend((const struct aiocb *const *) aio_list,
+          aio_nent, NULL);
+        continue_loop = 0;
+        for (ii = 0; ii < aio_nent; ++ii) {
+          if (EINPROGRESS == aio_error(aio_list[ii])) {
+            continue_loop = 1;
+          } else {
+            if (-1 == aio_return(aio_list[ii])) {
+              ereport(PANIC,
+                (errcode_for_file_access(),
+                errmsg("unhandled POSIX AIO condition: %m")));
+            }
+          }
+        }
+      } while (continue_loop);
+      elog(LOG, "completed EINTR handling");
+    } else {
+      ereport(WARNING,
+        (errcode_for_file_access(),
+        errmsg("lio_listio returned: %m")));
+    }
+  }
+
+  for (ii = 0; ii < aio_nent; ++ii) {
+    int aio_errno = aio_error(aio_list[ii]);
+    size_t aio_ret = aio_return(aio_list[ii]);
+
+    if (BLCKSZ != aio_ret) {
+      errno = aio_errno;
+      ereport(ERROR,
+        (errcode_for_file_access(),
+        errmsg("lio_listio returned: %m")));
+    }
+  }
+
+  return (true);
+} /* FileAIO() */
+#else /* use libaio */
+bool
+FileAIO (
+  aio_req_t *req
+) {
+  int rc, i, num_iocbs;
+  static io_context_t ctx;
+  static bool initialized = false;
+  struct iocb array[MAX_AIO_BATCH_SIZE];
+  struct iocb *list[MAX_AIO_BATCH_SIZE];
+  struct io_event reap[MAX_AIO_BATCH_SIZE];
+
+  /* We only need to do io_setup once */
+  if (!initialized) {
+    memset(&ctx, 0x0, sizeof(ctx));
+    rc = io_setup(MAX_AIO_BATCH_SIZE, &ctx);
+    if (rc) {
+      elog(PANIC, "io_setup returned %d", rc);
+    }
+    initialized = true;
+  }
+
+retry:
+  memset(array, 0x0, sizeof(struct iocb) * MAX_AIO_BATCH_SIZE);
+  memset(reap, 0x0, sizeof(struct io_event) * MAX_AIO_BATCH_SIZE);
+
+  /*
+   * Iterate over the AIO request structure adding each buffer to
+   * the list.
+   */
+  for (i = 0, num_iocbs = 0
+    ; i < max_asyncio_events
+    ; ++i) {
+    if (!req[i].isValid) {
+      continue;
+    }
+
+    Assert(FileIsValid(req[i].file));
+    rc = FileAccess(req[i].file);
+    if (rc < 0) {
+      elog(ERROR, "FileAccess returned %d at line %d in %s",
+        rc, __LINE__, __func__);
+      list[i] = NULL;
+      continue;
+    }
+
+    array[i].aio_lio_opcode = IO_CMD_PWRITE;
+    array[i].aio_fildes = VfdCache[req[i].file].fd;
+    array[i].u.c.buf = req[i].buf;
+    array[i].u.c.nbytes = BLCKSZ;
+    array[i].u.c.offset = req[i].offset;
+    list[num_iocbs++] = &array[i];
+  }
+
+  /* Submit the request for processing */
+  errno = 0;
+  elog(DEBUG1, "submitting libaio request containing %d iocbs", num_iocbs);
+  rc = io_submit(ctx, num_iocbs, list);
+  if (rc != num_iocbs) {
+    elog(PANIC, "io_submit returned %d nr_submitted: %d  errno: %d %m", rc,
+      num_iocbs, errno);
+  }
+
+  rc = io_getevents(ctx, num_iocbs, num_iocbs, reap, NULL);
+  if (rc != num_iocbs) {
+    elog(LOG, "io_getevents failed (nr=%d rc=%d)", num_iocbs, rc);
+    for (i = 0; i < num_iocbs; ++i) {
+      if (reap[i].res2 != 0) {
+        elog(LOG, "io_getevents IO=%d status=%lu errno=%d %m", i,
+          reap[i].res2, errno);
+      }
+
+      if ((reap[i].obj != NULL) &&
+        (reap[i].res != reap[i].obj->u.c.nbytes)) {
+        elog(LOG, "io_getevents missing bytes (expected %lu got %lu)",
+          reap[i].obj->u.c.nbytes, reap[i].res);
+      }
+    }
+
+    /*
+     * XXX this should be changed to handle individual failures rather
+     * than re-submitting the entire request, but for now, just retry.
+     */
+    elog(LOG, "retrying AIO request");
+    goto retry;
+  }
+
+  return (true);
+} /* FileAIO() */
+#endif /* USE_POSIX_AIO */
+
+
 
 /*
  * Create a new file.  The directory containing it must already exist.  Files
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 4c14e51c67..c6c2fcff49 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -130,7 +130,7 @@ LWLockPadded *MainLWLockArray = NULL;
  * occasionally the number can be much higher; for example, the pg_buffercache
  * extension locks all buffer partitions simultaneously.
  */
-#define MAX_SIMUL_LWLOCKS	200
+#define MAX_SIMUL_LWLOCKS	1024
 
 /* struct representing the LWLocks we're holding */
 typedef struct LWLockHandle
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index e0b020da11..2c219706dd 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -87,6 +87,20 @@ typedef struct _MdfdVec
 
 static MemoryContext MdCxt;		/* context for all MdfdVec objects */
 
+/* Direct I/O Support */
+static char *localDirectIOBuffer = NULL;
+static char *alignedDirectIOBuffer = NULL;
+
+/* Direct I/O flags passed to open() */
+#define DIRECT_IO_FLAGS		((EnableDirectIO) ? (PG_O_DIRECT | O_SYNC) : (0))
+
+/* bool DIRECTIO_BUFFER_REQUIRED (void *ptr); */
+#define DIRECTIO_BUFFER_REQUIRED(ptr)                                         \
+	(EnableDirectIO && ((long) ptr & (ALIGNOF_DIRECTIO - 1)))
+	/*
+	** Returns true iff Direct I/O is enabled and the given pointer isn't
+	** properly aligned.
+	*/
 
 /* Populate a file tag describing an md.c segment file. */
 #define INIT_MD_FILETAG(a,xx_rnode,xx_forknum,xx_segno) \
@@ -150,6 +164,15 @@ mdinit(void)
 	MdCxt = AllocSetContextCreate(TopMemoryContext,
 								  "MdSmgr",
 								  ALLOCSET_DEFAULT_SIZES);
+
+	/*
+	 * Initialize the buffer used for handling unaligned reads/writes when
+	 * Direct I/O is enabled.
+	 */
+	if (EnableDirectIO) {
+		localDirectIOBuffer = (char *) palloc(BLCKSZ + ALIGNOF_DIRECTIO);
+		alignedDirectIOBuffer = (char *) DIOBUFFERALIGN(localDirectIOBuffer);
+	}
 }
 
 /*
@@ -201,14 +224,14 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
 
 	path = relpath(reln->smgr_rnode, forkNum);
 
-	fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
+	fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY | DIRECT_IO_FLAGS);
 
 	if (fd < 0)
 	{
 		int			save_errno = errno;
 
 		if (isRedo)
-			fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+			fd = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS);
 		if (fd < 0)
 		{
 			/* be sure to report the error reported by create, not open */
@@ -390,6 +413,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	off_t		seekpos;
 	int			nbytes;
 	MdfdVec    *v;
+	char		*ptr;
 
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
@@ -414,7 +438,15 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
 
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
+	/* Use the Direct I/O buffer iff it's required */
+	if (DIRECTIO_BUFFER_REQUIRED(buffer)) {
+		memcpy(alignedDirectIOBuffer, buffer, BLCKSZ);
+		ptr = alignedDirectIOBuffer;
+	} else {
+		ptr = buffer;
+	}
+
+	if ((nbytes = FileWrite(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
 	{
 		if (nbytes < 0)
 			ereport(ERROR,
@@ -460,7 +492,7 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior)
 
 	path = relpath(reln->smgr_rnode, forknum);
 
-	fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+	fd = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS);
 
 	if (fd < 0)
 	{
@@ -556,6 +588,10 @@ void
 mdwriteback(SMgrRelation reln, ForkNumber forknum,
 			BlockNumber blocknum, BlockNumber nblocks)
 {
+	if (EnableDirectIO) {
+		return;
+	}
+
 	/*
 	 * Issue flush requests in as few requests as possible; have to split at
 	 * segment boundaries though, since those are actually separate files.
@@ -608,6 +644,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	off_t		seekpos;
 	int			nbytes;
 	MdfdVec    *v;
+	char		*ptr;
 
 	TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum,
 										reln->smgr_rnode.node.spcNode,
@@ -622,7 +659,14 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
 
-	nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
+	/* Use the Direct I/O buffer iff it's required */
+	if (DIRECTIO_BUFFER_REQUIRED(buffer)) {
+		ptr = alignedDirectIOBuffer;
+	} else {
+		ptr = buffer;
+	}
+
+	nbytes = FileRead(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
 
 	TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
 									   reln->smgr_rnode.node.spcNode,
@@ -673,6 +717,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	off_t		seekpos;
 	int			nbytes;
 	MdfdVec    *v;
+	char	   *ptr;
 
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
@@ -692,7 +737,15 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
 
-	nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+	/* Use the Direct I/O buffer iff it's required */
+	if (DIRECTIO_BUFFER_REQUIRED(buffer)) {
+		memcpy(alignedDirectIOBuffer, buffer, BLCKSZ);
+		ptr = alignedDirectIOBuffer;
+	} else {
+		ptr = buffer;
+	}
+
+	nbytes = FileWrite(v->mdfd_vfd, ptr, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
 
 	TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
 										reln->smgr_rnode.node.spcNode,
@@ -894,6 +947,10 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 	int			segno;
 	int			min_inactive_seg;
 
+	if (EnableDirectIO) {
+		return;
+	}
+
 	/*
 	 * NOTE: mdnblocks makes sure we have opened all active segments, so that
 	 * fsync loop will get them all!
@@ -930,6 +987,31 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 
 		segno--;
 	}
+
+}
+
+/*
+ * mdaio() -- Asynchronous I/O request.
+ */
+bool
+mdaio (aio_req_t *req)
+{
+    int         i;
+    MdfdVec    *v;
+
+    Assert(req != NULL);
+
+    for (i = 0; i < max_asyncio_events; ++i) {
+        if (req[i].isValid == false)
+            continue;
+
+        v = _mdfd_getseg(req[i].reln, req[i].fnum, req[i].bnum, false, EXTENSION_FAIL);
+        req[i].file = v->mdfd_vfd;
+        req[i].offset = (off_t) BLCKSZ *(req[i].bnum % ((BlockNumber) RELSEG_SIZE));
+        Assert(req[i].offset < BLCKSZ * RELSEG_SIZE);
+    }
+
+    return FileAIO(req);
 }
 
 /*
@@ -1120,7 +1202,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno,
 	fullpath = _mdfd_segpath(reln, forknum, segno);
 
 	/* open the file */
-	fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags);
+	fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS | oflags);
 
 	pfree(fullpath);
 
@@ -1324,7 +1406,7 @@ mdsyncfiletag(const FileTag *ftag, char *path)
 		strlcpy(path, p, MAXPGPATH);
 		pfree(p);
 
-		file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
+		file = PathNameOpenFile(path, O_RDWR | PG_BINARY | DIRECT_IO_FLAGS);
 		if (file < 0)
 			return -1;
 		need_to_close = true;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index b053a4dc76..d42b730335 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -61,6 +61,7 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	bool		(*smgr_aio) (aio_req_t *);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -81,6 +82,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_aio = mdaio,
 	}
 };
 
@@ -709,3 +711,13 @@ AtEOXact_SMgr(void)
 		smgrclose(rel);
 	}
 }
+
+/*
+ *	smgraio() -- Asynchronous I/O request.
+ */
+bool
+smgraio (aio_req_t *req)
+{
+    return (*(smgrsw[0].smgr_aio)) (req);
+}
+
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 5bdc02fce2..36f672e312 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2060,6 +2060,24 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"enable_directio", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+			gettext_noop("Enables Direct I/O."),
+			NULL
+		},
+		&EnableDirectIO,
+		false, NULL, NULL
+	},
+
+	{
+		{"enable_asyncio", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+			gettext_noop("Enables Asynchronous I/O."),
+			NULL
+		},
+		&EnableAsyncIO,
+		false, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3400,6 +3418,15 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, assign_tcp_user_timeout, show_tcp_user_timeout
 	},
 
+	{
+		{"max_asyncio_events", PGC_POSTMASTER, RESOURCES_KERNEL,
+			gettext_noop("Sets the maximum number of asynchronous I/O events per request."),
+			NULL
+		},
+		&max_asyncio_events,
+		16, 1, MAX_AIO_BATCH_SIZE, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 995b6ca155..bf94d422e6 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -770,6 +770,18 @@
 #include_if_exists = '...'		# include file only if it exists
 #include = '...'			# include file
 
+#------------------------------------------------------------------------------
+# AIO/DIO Options
+#------------------------------------------------------------------------------
+
+# Enables usage of direct I/O (O_DIRECT)
+#enable_directio = off
+
+# Enables usage of asynchronous I/O (libaio or POSIX)
+#enable_asyncio = off
+
+# The maximum number of asynchonous I/O events per call (between 1-512)
+#max_asyncio_events = 16
 
 #------------------------------------------------------------------------------
 # CUSTOMIZED OPTIONS
diff --git a/src/include/c.h b/src/include/c.h
index d72b23afe4..c7e8a63fb4 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -691,6 +691,7 @@ typedef NameData *Name;
 #define MAXALIGN(LEN)			TYPEALIGN(MAXIMUM_ALIGNOF, (LEN))
 /* MAXALIGN covers only built-in types, not buffers */
 #define BUFFERALIGN(LEN)		TYPEALIGN(ALIGNOF_BUFFER, (LEN))
+#define DIOBUFFERALIGN(LEN)		TYPEALIGN(ALIGNOF_DIRECTIO, (LEN))
 #define CACHELINEALIGN(LEN)		TYPEALIGN(PG_CACHE_LINE_SIZE, (LEN))
 
 #define TYPEALIGN_DOWN(ALIGNVAL,LEN)  \
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index c199cd46d2..a31c8fcfb5 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -310,6 +310,9 @@
 /* Define to 1 if you have the `ldap_initialize' function. */
 #undef HAVE_LDAP_INITIALIZE
 
+/* Define to 1 if you have the `aio' library (-laio). */
+#undef HAVE_LIBAIO
+
 /* Define to 1 if you have the `crypto' library (-lcrypto). */
 #undef HAVE_LIBCRYPTO
 
@@ -328,6 +331,9 @@
 /* Define if you have a function readline library */
 #undef HAVE_LIBREADLINE
 
+/* Define to 1 if you have the `rt' library (-lrt). */
+#undef HAVE_LIBRT
+
 /* Define to 1 if you have the `selinux' library (-lselinux). */
 #undef HAVE_LIBSELINUX
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 8f3ec6bde1..3ccf2fb812 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -122,6 +122,12 @@
  */
 #define ALIGNOF_BUFFER	32
 
+/*
+ * Preferred alignment for Direct I/O buffers. This should generally be done
+ * on a 512-byte boundary.
+ */
+#define ALIGNOF_DIRECTIO	512
+
 /*
  * If EXEC_BACKEND is defined, the postmaster uses an alternative method for
  * starting subprocesses: Instead of simply using fork(), as is standard on
@@ -145,6 +151,15 @@
 #define USE_POSIX_FADVISE
 #endif
 
+/*
+ * USE_POSIX_AIO controls whether Postgres will use POSIX's version of
+ * asynchonous I/O, or whether we'll use libaio.  By default, we'd
+ * prefer to use libaio where available (i.e. on Linux.)
+ */
+#ifndef HAVE_LIBAIO
+# define USE_POSIX_AIO
+#endif
+
 /*
  * USE_PREFETCH code should be compiled only if we have a way to implement
  * prefetching.  (This is decoupled from USE_POSIX_FADVISE because there
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 8cd125d7df..54125d0fd0 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -44,20 +44,37 @@
 #define FD_H
 
 #include <dirent.h>
+#include "storage/smgr.h"
 
+#ifdef USE_POSIX_AIO
+# include <aio.h>
+#else
+# include <libaio.h>
+#endif /* USE_POSIX_AIO */
 
 typedef int File;
 
-
 /* GUC parameter */
 extern PGDLLIMPORT int max_files_per_process;
 extern PGDLLIMPORT bool data_sync_retry;
+extern PGDLLIMPORT bool EnableDirectIO;
+extern PGDLLIMPORT bool EnableAsyncIO;
+extern PGDLLIMPORT int max_asyncio_events;
 
 /*
  * This is private to fd.c, but exported for save/restore_backend_variables()
  */
 extern int	max_safe_fds;
 
+/*
+ * The platform-dependent maximum number of AIO requests per batch.
+ */
+#ifdef AIO_LISTIO_MAX
+#define MAX_AIO_BATCH_SIZE    AIO_LISTIO_MAX
+#else
+#define MAX_AIO_BATCH_SIZE    16
+#endif /* AIO_LISTIO_MAX */
+
 /*
  * On Windows, we have to interpret EACCES as possibly meaning the same as
  * ENOENT, because if a file is unlinked-but-not-yet-gone on that platform,
@@ -91,6 +108,7 @@ extern char *FilePathName(File file);
 extern int	FileGetRawDesc(File file);
 extern int	FileGetRawFlags(File file);
 extern mode_t FileGetRawMode(File file);
+extern bool FileAIO(aio_req_t *req);
 
 /* Operations used for sharing named temporary files */
 extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 07fd1bb7d0..2e167beb90 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -40,6 +40,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern bool mdaio(aio_req_t *);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index bb8428f27f..816c16b38f 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -76,6 +76,17 @@ typedef struct SMgrRelationData
 
 typedef SMgrRelationData *SMgrRelation;
 
+/* Asynchronous I/O request structure */
+typedef struct {
+  bool            isValid;
+  SMgrRelation    reln;
+  BlockNumber     bnum;
+  ForkNumber      fnum;
+  off_t           offset;
+  int             file;
+  char           *buf;
+} aio_req_t;
+
 #define SmgrIsTemp(smgr) \
 	RelFileNodeBackendIsTemp((smgr)->smgr_rnode)
 
@@ -105,6 +116,7 @@ extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
 						 int nforks, BlockNumber *nblocks);
 extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern bool smgraio(aio_req_t *);
 extern void AtEOXact_SMgr(void);
 
 #endif							/* SMGR_H */
