On Sat, Apr 2, 2016 at 10:20 AM, Thomas Munro
<thomas.mu...@enterprisedb.com> wrote:
> On Thu, Mar 31, 2016 at 5:11 PM, Thomas Munro
> <thomas.mu...@enterprisedb.com> wrote:
>> On Thu, Mar 31, 2016 at 3:55 AM, Masahiko Sawada <sawada.m...@gmail.com> 
>> wrote:
>>> On Wed, Mar 30, 2016 at 11:43 PM, Masahiko Sawada <sawada.m...@gmail.com> 
>>> wrote:
>>>> On Tue, Mar 29, 2016 at 5:36 PM, Kyotaro HORIGUCHI
>>>> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
>>>>> I personally don't think it needs such a survive measure. It is
>>>>> very small syntax and the parser reads very short text. If the
>>>>> parser failes in such mode, something more serious should have
>>>>> occurred.
>>>>>
>>>>> At Tue, 29 Mar 2016 16:51:02 +0900, Fujii Masao <masao.fu...@gmail.com> 
>>>>> wrote in 
>>>>> <cahgqgwfth8pnyhalbx0nf8o4qmwctdzeocwrqeu7howgdjg...@mail.gmail.com>
>>>>>> On Tue, Mar 29, 2016 at 4:23 PM, Kyotaro HORIGUCHI
>>>>>> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
>>>>>> > Hello,
>>>>>> >
>>>>>> > At Mon, 28 Mar 2016 18:38:22 +0900, Masahiko Sawada 
>>>>>> > <sawada.m...@gmail.com> wrote in 
>>>>>> > <cad21aoajmdv1eukmfeyav24arx4pzujghyby4zxzkpkicuv...@mail.gmail.com>
>>>>>> > sawada.mshk> On Mon, Mar 28, 2016 at 5:50 PM, Kyotaro HORIGUCHI
>>>>>> >> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
>>>>>> > As mentioned in my comment, SQL parser converts yy_fatal_error
>>>>>> > into ereport(ERROR), which can be caught by the upper PG_TRY (by
>>>>>> > #define'ing fprintf). So it is doable if you mind exit().
>>>>>>
>>>>>> I'm afraid that your idea doesn't work in postmaster. Because 
>>>>>> ereport(ERROR) is
>>>>>> implicitly promoted to ereport(FATAL) in postmaster. IOW, when an 
>>>>>> internal
>>>>>> flex fatal error occurs, postmaster just exits instead of jumping out of 
>>>>>> parser.
>>>>>
>>>>> If The ERROR may be LOG or DEBUG2 either, if we think the parser
>>>>> fatal erros are recoverable. guc-file.l is doing so.
>>>>>
>>>>>> ISTM that, when an internal flex fatal error occurs, it's
>>>>>> better to elog(FATAL) and terminate the problematic
>>>>>> process. This might lead to the server crash (e.g., if
>>>>>> postmaster emits a FATAL error, it and its all child processes
>>>>>> will exit soon). But probably we can live with this because the
>>>>>> fatal error basically rarely happens.
>>>>>
>>>>> I agree to this
>>>>>
>>>>>> OTOH, if we make the process keep running even after it gets an internal
>>>>>> fatal error (like Sawada's patch or your idea do), this might cause more
>>>>>> serious problem. Please imagine the case where one walsender gets the 
>>>>>> fatal
>>>>>> error (e.g., because of OOM), abandon new setting value of
>>>>>> synchronous_standby_names, and keep running with the previous setting 
>>>>>> value.
>>>>>> OTOH, the other walsender processes successfully parse the setting and
>>>>>> keep running with new setting. In this case, the inconsistency of the 
>>>>>> setting
>>>>>> which each walsender is based on happens. This completely will mess up 
>>>>>> the
>>>>>> synchronous replication.
>>>>>
>>>>> On the other hand, guc-file.l seems ignoring parser errors under
>>>>> normal operation, even though it may cause similar inconsistency,
>>>>> if any..
>>>>>
>>>>> | LOG:  received SIGHUP, reloading configuration files
>>>>> | LOG:  input in flex scanner failed at file 
>>>>> "/home/horiguti/data/data_work/postgresql.conf" line 1
>>>>> | LOG:  configuration file 
>>>>> "/home/horiguti/data/data_work/postgresql.conf" contains errors; no 
>>>>> changes were applied
>>>>>
>>>>>> Therefore, I think that it's better to make the problematic process exit
>>>>>> with FATAL error rather than ignore the error and keep it running.
>>>>>
>>>>> +1. Restarting walsender would be far less harmful than keeping
>>>>> it running in doubtful state.
>>>>>
>>>>> Sould I wait for the next version or have a look on the latest?
>>>>>
>>>>
>>>> Attached latest patch incorporate some review comments so far, and is
>>>> rebased against current HEAD.
>>>>
>>>
>>> Sorry I attached wrong patch.
>>> Attached patch is correct patch.

Thanks for updating the patch!

I applied the following changes to the patch.
Attached is the revised version of the patch.

- Changed syncrep_flex_fatal() so that it just calls ereport(FATAL), based on
  the recent discussion with Horiguchi-san.
- Improved the documentation.
- Fixed some bugs.
- Removed the changes for recovery testing framework. I'd like to commit
   those changes later separately from the main patch of multiple sync rep.

Barring any objections, I'll commit this patch.

>> One thing I noticed is that there are LOG messages telling me when a
>> standby becomes a synchronous standby, but nothing to tell me if a
>> standby stops being a standby (ie because a higher priority one has
>> taken its place in the quorum).  Would that be interesting?

+1

>> Also, I spotted some tiny mistakes:
>>
>> +  <indexterm zone="high-availability">
>> +   <primary>Dedicated language for multiple synchornous 
>> replication</primary>
>> +  </indexterm>
>>
>> s/synchornous/synchronous/

Confirmed that there is no typo "synchornous" in the latest patch.

>> + /*
>> + * If we are managing the sync standby, though we weren't
>> + * prior to this, then announce we are now the sync standby.
>> + */
>>
>> s/ the / a / (two occurrences)

Fixed.

>> + ereport(LOG,
>> + (errmsg("standby \"%s\" is now the synchronous standby with priority %u",
>> + application_name, MyWalSnd->sync_standby_priority)));
>>
>> s/ the / a /

I have no objection to this change itself. But we have used this message
in 9.5 or before, so if we apply this change, probably we need
back-patching.

>>
>>      offered by a transaction commit. This level of protection is referred
>> -    to as 2-safe replication in computer science theory.
>> +    to as 2-safe replication in computer science theory, and group-1-safe
>> +    (group-safe and 1-safe) when <varname>synchronous_commit</> is set to
>> +    more than <literal>remote_write</>.
>>
>> Why "more than"?  I think those two words should be changed to "at
>> least", or removed.

Removed.

>> +   <para>
>> +    This syntax allows us to define a synchronous group that will wait for 
>> at
>> +    least N standbys of them, and a comma-separated list of group
>> members that are surrounded by
>> +    parantheses.  The special value <literal>*</> for server name
>> matches any standby.
>> +    By surrounding list of group members using parantheses,
>> synchronous standbys are chosen from
>> +    that group using priority method.
>> +   </para>
>>
>> s/parantheses/parentheses/ (two occurrences)

Confirmed that this typo doesn't exist in the latest patch.

>>
>> +  <sect2 id="dedicated-language-for-multi-sync-replication-priority">
>> +   <title>Prioirty Method</title>
>>
>> s/Prioirty Method/Priority Method/

Confirmed that this typo doesn't exist in the latest patch.

> A couple more comments:
>
>   /*
> - * If we aren't managing the highest priority standby then just leave.
> + * If the number of sync standbys is less than requested or we aren't
> + * managing the sync standby then just leave.
>   */
> - if (syncWalSnd != MyWalSnd)
> + if (!got_oldest || !am_sync)
>
> s/ the sync / a sync /

Fixed.

> + /*
> + * Consider all pending standbys as sync if the number of them plus
> + * already-found sync ones is lower than the configuration requests.
> + */
> + if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
> + return list_concat(result, pending);
>
> The cells from 'pending' will be attached to 'result', and 'result'
> will be freed by the caller.  But won't the List header object from
> 'pending' be leaked?

Yes if 'result' is not NIL. I added pfree(pending) for that case.

> + result = lappend_int(result, i);
> + if (list_length(result) == SyncRepConfig->num_sync)
> + {
> + list_free(pending);
> + return result; /* Exit if got enough sync standbys */
> + }
>
> If we didn't take the early return in the list-not-long-enough case
> mentioned above, we should *always* exit via this return statement,
> right?  Since we know that the pending list had enough elements to
> reach num_sync.  I think that is worth a comment, and also a "not
> reached" comment at the bottom of the function, if it is true.

Good catch! I added the comments. Also added Assert(false) at
the bottom of the function.

> As a future improvement, I wonder if we could avoid recomputing the
> current set of sync standbys in every walsender every time we call
> SyncRepReleaseWaiters, perhaps by maintaining that set incrementally
> in shmem when walsender states change etc.

+1

> I don't have any other comments, other than to say: thank you to all
> the people who have contributed to this feature so far and I really
> really hope it goes into 9.6!

+1000

Regards,

-- 
Fujii Masao
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2906,2939 **** include_dir 'conf.d'
        </term>
        <listitem>
         <para>
!         Specifies a comma-separated list of standby names that can support
          <firstterm>synchronous replication</>, as described in
          <xref linkend="synchronous-replication">.
!         At any one time there will be at most one active synchronous standby;
          transactions waiting for commit will be allowed to proceed after
!         this standby server confirms receipt of their data.
!         The synchronous standby will be the first standby named in this list
          that is both currently connected and streaming data in real-time
          (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
!         synchronous standbys.
!         If the current synchronous standby disconnects for whatever reason,
          it will be replaced immediately with the next-highest-priority standby.
          Specifying more than one standby name can allow very high availability.
         </para>
         <para>
          The name of a standby server for this purpose is the
          <varname>application_name</> setting of the standby, as set in the
          <varname>primary_conninfo</> of the standby's WAL receiver.  There is
          no mechanism to enforce uniqueness. In case of duplicates one of the
!         matching standbys will be chosen to be the synchronous standby, though
          exactly which one is indeterminate.
          The special entry <literal>*</> matches any
          <varname>application_name</>, including the default application name
          of <literal>walreceiver</>.
         </para>
         <para>
          If no synchronous standby names are specified here, then synchronous
          replication is not enabled and transaction commits will not wait for
--- 2906,2974 ----
        </term>
        <listitem>
         <para>
!         Specifies a list of standby names that can support
          <firstterm>synchronous replication</>, as described in
          <xref linkend="synchronous-replication">.
!         There will be one or more active synchronous standbys;
          transactions waiting for commit will be allowed to proceed after
!         these standby servers confirm receipt of their data.
!         The synchronous standbys will be the standbys that their names
!         appear early in this list, and
          that is both currently connected and streaming data in real-time
          (as shown by a state of <literal>streaming</literal> in the
          <link linkend="monitoring-stats-views-table">
          <literal>pg_stat_replication</></link> view).
          Other standby servers appearing later in this list represent potential
!         synchronous standbys. If any of the current synchronous
!         standbys disconnects for whatever reason,
          it will be replaced immediately with the next-highest-priority standby.
          Specifying more than one standby name can allow very high availability.
         </para>
         <para>
+         This parameter specifies a list of standby servers by using
+         either of the following syntaxes:
+ <synopsis>
+ <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">standby_name</replaceable> [, ...] )
+ <replaceable class="parameter">standby_name</replaceable> [, ...]
+ </synopsis>
+         where <replaceable class="parameter">num_sync</replaceable> is
+         the number of synchronous standbys that transactions need to
+         wait for replies from,
+         and <replaceable class="parameter">standby_name</replaceable>
+         is the name of a standby server. For example, a setting of
+         <literal>'3 (s1, s2, s3, s4)'</> makes transaction commits wait
+         until their WAL records are received by three higher priority standbys
+         chosen from standby servers <literal>s1</>, <literal>s2</>,
+         <literal>s3</> and <literal>s4</>.
+         </para>
+         <para>
+         The second syntax was used before <productname>PostgreSQL</>
+         version 9.6 and is still supported. It's the same as the first syntax
+         with <replaceable class="parameter">num_sync</replaceable>=1.
+         For example, both settings of <literal>'1 (s1, s2)'</> and
+         <literal>'s1, s2'</> have the same meaning; either <literal>s1</>
+         or <literal>s2</> is chosen as a synchronous standby.
+        </para>
+        <para>
          The name of a standby server for this purpose is the
          <varname>application_name</> setting of the standby, as set in the
          <varname>primary_conninfo</> of the standby's WAL receiver.  There is
          no mechanism to enforce uniqueness. In case of duplicates one of the
!         matching standbys will be considered as higher priority, though
          exactly which one is indeterminate.
          The special entry <literal>*</> matches any
          <varname>application_name</>, including the default application name
          of <literal>walreceiver</>.
         </para>
+        <note>
+         <para>
+          The <replaceable class="parameter">standby_name</replaceable>
+          must be enclosed in double quotes if a comma (<literal>,</>),
+          a double quote (<literal>"</>), <!-- " font-lock sanity -->
+          a left parentheses (<literal>(</>), a right parentheses (<literal>)</>)
+          or a space is used in the name of a standby server.
+         </para>
+        </note>
         <para>
          If no synchronous standby names are specified here, then synchronous
          replication is not enabled and transaction commits will not wait for
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 1027,1036 **** primary_slot_name = 'node_a_slot'
  
     <para>
      Synchronous replication offers the ability to confirm that all changes
!     made by a transaction have been transferred to one synchronous standby
!     server. This extends the standard level of durability
      offered by a transaction commit. This level of protection is referred
!     to as 2-safe replication in computer science theory.
     </para>
  
     <para>
--- 1027,1038 ----
  
     <para>
      Synchronous replication offers the ability to confirm that all changes
!     made by a transaction have been transferred to one or more synchronous
!     standby servers. This extends that standard level of durability
      offered by a transaction commit. This level of protection is referred
!     to as 2-safe replication in computer science theory, and group-1-safe
!     (group-safe and 1-safe) when <varname>synchronous_commit</> is set to
!     <literal>remote_write</>.
     </para>
  
     <para>
***************
*** 1084,1091 **** primary_slot_name = 'node_a_slot'
      In the case that <varname>synchronous_commit</> is set to
      <literal>remote_apply</>, the standby sends reply messages when the commit
      record is replayed, making the transaction visible.
!     If the standby is the first matching standby, as specified in
!     <varname>synchronous_standby_names</> on the primary, the reply
      messages from that standby will be used to wake users waiting for
      confirmation that the commit record has been received. These parameters
      allow the administrator to specify which standby servers should be
--- 1086,1093 ----
      In the case that <varname>synchronous_commit</> is set to
      <literal>remote_apply</>, the standby sends reply messages when the commit
      record is replayed, making the transaction visible.
!     If the standby is chosen as the synchronous standby, from a priority
!     list of <varname>synchronous_standby_names</> on the primary, the reply
      messages from that standby will be used to wake users waiting for
      confirmation that the commit record has been received. These parameters
      allow the administrator to specify which standby servers should be
***************
*** 1126,1131 **** primary_slot_name = 'node_a_slot'
--- 1128,1167 ----
  
     </sect3>
  
+    <sect3 id="synchronous-replication-multiple-standbys">
+     <title>Multiple Synchronous Standbys</title>
+ 
+    <para>
+     Synchronous replication supports one or more synchronous standby servers;
+     transactions will wait until all the standby servers which are considered
+     as synchronous confirm receipt of their data. The number of synchronous
+     standbys that transactions must wait for replies from is specified in
+     <varname>synchronous_standby_names</>. This parameter also specifies
+     a list of standby names, which determines the priority of each standby
+     for being chosen as a synchronous standby. The standbys that their names
+     appear early in this list are given higher priority and will be considered
+     as synchronous. Other standby servers appearing later in this list
+     represent potential synchronous standbys. If any of the current
+     synchronous standbys disconnects for whatever reason, it will be replaced
+     immediately with the next-highest-priority standby.
+    </para>
+    <para>
+     An example of <varname>synchronous_standby_names</> for multiple
+     synchronous standbys is:
+ <programlisting>
+ synchronous_standby_names = '2 (s1, s2, s3)'
+ </programlisting>
+     In this example, if four standby servers <literal>s1</>, <literal>s2</>,
+     <literal>s3</> and <literal>s4</> are running, the two standbys
+     <literal>s1</> and <literal>s2</> will be chosen as synchronous standbys
+     because their names appear early in the list of standby names.
+     <literal>s3</> is a potential synchronous standby and will take over
+     the role of synchronous standby when either of <literal>s1</> or
+     <literal>s2</> fails. <literal>s4</> is an asynchronous standby since
+     its name is not in the list.
+    </para>
+    </sect3>
+ 
     <sect3 id="synchronous-replication-performance">
      <title>Planning for Performance</title>
  
***************
*** 1171,1189 **** primary_slot_name = 'node_a_slot'
      <title>Planning for High Availability</title>
  
     <para>
!     Commits made when <varname>synchronous_commit</> is set to <literal>on</>,
!     <literal>remote_apply</> or <literal>remote_write</> will wait until the
!     synchronous standby responds. The response may never occur if the last, or
!     only, standby should crash.
     </para>
  
     <para>
!     The best solution for avoiding data loss is to ensure you don't lose
!     your last remaining synchronous standby. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The first named standby will be used as the synchronous standby. Standbys
!     listed after this will take over the role of synchronous standby if the
!     first one should fail.
     </para>
  
     <para>
--- 1207,1227 ----
      <title>Planning for High Availability</title>
  
     <para>
!     <varname>synchronous_standby_names</> specifies the number of
!     synchronous standbys that transaction commits made when
!     <varname>synchronous_commit</> is set to <literal>on</>,
!     <literal>remote_apply</> or <literal>remote_write</> will wait for
!     responses from. Such transaction commits may never be completed
!     if any one of synchronous standbys should crash.
     </para>
  
     <para>
!     The best solution for high availability is to ensure you keep as many
!     synchronous standbys as requested. This can be achieved by naming multiple
      potential synchronous standbys using <varname>synchronous_standby_names</>.
!     The standbys that their names appear early in the list will be used as
!     the synchronous standby. Standbys listed after these will take over
!     the role of synchronous standby if one of current ones should fail.
     </para>
  
     <para>
***************
*** 1208,1220 **** primary_slot_name = 'node_a_slot'
      they show as committed on the primary. The guarantee we offer is that
      the application will not receive explicit acknowledgement of the
      successful commit of a transaction until the WAL data is known to be
!     safely received by the standby.
     </para>
  
     <para>
!     If you really do lose your last standby server then you should disable
!     <varname>synchronous_standby_names</> and reload the configuration file
!     on the primary server.
     </para>
  
     <para>
--- 1246,1260 ----
      they show as committed on the primary. The guarantee we offer is that
      the application will not receive explicit acknowledgement of the
      successful commit of a transaction until the WAL data is known to be
!     safely received by all the synchronous standbys.
     </para>
  
     <para>
!     If you really cannot keep as many synchronous standbys as requested
!     then you should decrease the number of synchronous standbys that
!     transaction commits must wait for responses from
!     in <varname>synchronous_standby_names</> (or disable it) and
!     reload the configuration file on the primary server.
     </para>
  
     <para>
*** a/src/backend/Makefile
--- b/src/backend/Makefile
***************
*** 203,209 **** distprep:
  	$(MAKE) -C parser	gram.c gram.h scan.c
  	$(MAKE) -C bootstrap	bootparse.c bootscanner.c
  	$(MAKE) -C catalog	schemapg.h postgres.bki postgres.description postgres.shdescription
! 	$(MAKE) -C replication	repl_gram.c repl_scanner.c
  	$(MAKE) -C storage/lmgr	lwlocknames.h
  	$(MAKE) -C utils	fmgrtab.c fmgroids.h errcodes.h
  	$(MAKE) -C utils/misc	guc-file.c
--- 203,209 ----
  	$(MAKE) -C parser	gram.c gram.h scan.c
  	$(MAKE) -C bootstrap	bootparse.c bootscanner.c
  	$(MAKE) -C catalog	schemapg.h postgres.bki postgres.description postgres.shdescription
! 	$(MAKE) -C replication	repl_gram.c repl_scanner.c syncrep_gram.c syncrep_scanner.c
  	$(MAKE) -C storage/lmgr	lwlocknames.h
  	$(MAKE) -C utils	fmgrtab.c fmgroids.h errcodes.h
  	$(MAKE) -C utils/misc	guc-file.c
***************
*** 320,325 **** maintainer-clean: distclean
--- 320,327 ----
  	      catalog/postgres.shdescription \
  	      replication/repl_gram.c \
  	      replication/repl_scanner.c \
+ 	      replication/syncrep_gram.c \
+ 	      replication/syncrep_scanner.c \
  	      storage/lmgr/lwlocknames.c \
  	      storage/lmgr/lwlocknames.h \
  	      utils/fmgroids.h \
*** a/src/backend/replication/.gitignore
--- b/src/backend/replication/.gitignore
***************
*** 1,2 ****
--- 1,4 ----
  /repl_gram.c
  /repl_scanner.c
+ /syncrep_gram.c
+ /syncrep_scanner.c
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 15,21 **** include $(top_builddir)/src/Makefile.global
  override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o slot.o slotfuncs.o syncrep.o
  
  SUBDIRS = logical
  
--- 15,21 ----
  override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
  
  SUBDIRS = logical
  
***************
*** 24,28 **** include $(top_srcdir)/src/backend/common.mk
  # repl_scanner is compiled as part of repl_gram
  repl_gram.o: repl_scanner.c
  
! # repl_gram.c and repl_scanner.c are in the distribution tarball, so
! # they are not cleaned here.
--- 24,33 ----
  # repl_scanner is compiled as part of repl_gram
  repl_gram.o: repl_scanner.c
  
! # syncrep_scanner is complied as part of syncrep_gram
! syncrep_gram.o: syncrep_scanner.c
! syncrep_scanner.c: FLEXFLAGS = -CF -p
! syncrep_scanner.c: FLEX_NO_BACKUP=yes
! 
! # repl_gram.c, repl_scanner.c, syncrep_gram.c and syncrep_scanner.c
! # are in the distribution tarball, so they are not cleaned here.
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 29,39 ****
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.1 we support only a single synchronous standby, chosen from a
!  * priority list of synchronous_standby_names. Before it can become the
!  * synchronous standby it must have caught up with the primary; that may
!  * take some time. Once caught up, the current highest priority standby
!  * will release waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
--- 29,40 ----
   * single ordered queue of waiting backends, so that we can avoid
   * searching the through all waiters each time we receive a reply.
   *
!  * In 9.6 we support multiple synchronous standbys, chosen from a
!  * priority list of synchronous_standby_names. Before they can become the
!  * synchronous standbys they must have caught up with the primary; that may
!  * take some time. Once caught up, the current higher priority standbys
!  * which are considered as synchronous at that moment will release
!  * waiters from the queue.
   *
   * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
   *
***************
*** 65,76 **** char	   *SyncRepStandbyNames;
--- 66,82 ----
  
  static bool announce_next_takeover = true;
  
+ SyncRepConfigData *SyncRepConfig;
  static int	SyncRepWaitMode = SYNC_REP_NO_WAIT;
  
  static void SyncRepQueueInsert(int mode);
  static void SyncRepCancelWait(void);
  static int	SyncRepWakeQueue(bool all, int mode);
  
+ static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ 									   XLogRecPtr *flushPtr,
+ 									   XLogRecPtr *applyPtr,
+ 									   bool *am_sync);
  static int	SyncRepGetStandbyPriority(void);
  
  #ifdef USE_ASSERT_CHECKING
***************
*** 343,348 **** SyncRepInitConfig(void)
--- 349,359 ----
  {
  	int			priority;
  
+ 	/* Update the config data of synchronous replication */
+ 	SyncRepFreeConfig(SyncRepConfig);
+ 	SyncRepConfig = NULL;
+ 	SyncRepUpdateConfig();
+ 
  	/*
  	 * Determine if we are a potential sync standby and remember the result
  	 * for handling replies from standby.
***************
*** 360,421 **** SyncRepInitConfig(void)
  }
  
  /*
-  * Find the WAL sender servicing the synchronous standby with the lowest
-  * priority value, or NULL if no synchronous standby is connected. If there
-  * are multiple standbys with the same lowest priority value, the first one
-  * found is selected. The caller must hold SyncRepLock.
-  */
- WalSnd *
- SyncRepGetSynchronousStandby(void)
- {
- 	WalSnd	   *result = NULL;
- 	int			result_priority = 0;
- 	int			i;
- 
- 	for (i = 0; i < max_wal_senders; i++)
- 	{
- 		/* Use volatile pointer to prevent code rearrangement */
- 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- 		int			this_priority;
- 
- 		/* Must be active */
- 		if (walsnd->pid == 0)
- 			continue;
- 
- 		/* Must be streaming */
- 		if (walsnd->state != WALSNDSTATE_STREAMING)
- 			continue;
- 
- 		/* Must be synchronous */
- 		this_priority = walsnd->sync_standby_priority;
- 		if (this_priority == 0)
- 			continue;
- 
- 		/* Must have a lower priority value than any previous ones */
- 		if (result != NULL && result_priority <= this_priority)
- 			continue;
- 
- 		/* Must have a valid flush position */
- 		if (XLogRecPtrIsInvalid(walsnd->flush))
- 			continue;
- 
- 		result = (WalSnd *) walsnd;
- 		result_priority = this_priority;
- 
- 		/*
- 		 * If priority is equal to 1, there cannot be any other WAL senders
- 		 * with a lower priority, so we're done.
- 		 */
- 		if (this_priority == 1)
- 			return result;
- 	}
- 
- 	return result;
- }
- 
- /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and
   * perhaps also which information we store as well.
--- 371,378 ----
  }
  
  /*
   * Update the LSNs on each queue based upon our latest state. This
!  * implements a simple policy of first-valid-sync-standby-releases-waiter.
   *
   * Other policies are possible, which would change what we do here and
   * perhaps also which information we store as well.
***************
*** 424,430 **** void
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	WalSnd	   *syncWalSnd;
  	int			numwrite = 0;
  	int			numflush = 0;
  	int			numapply = 0;
--- 381,391 ----
  SyncRepReleaseWaiters(void)
  {
  	volatile WalSndCtlData *walsndctl = WalSndCtl;
! 	XLogRecPtr	writePtr;
! 	XLogRecPtr	flushPtr;
! 	XLogRecPtr	applyPtr;
! 	bool		got_oldest;
! 	bool		am_sync;
  	int			numwrite = 0;
  	int			numflush = 0;
  	int			numapply = 0;
***************
*** 441,462 **** SyncRepReleaseWaiters(void)
  		return;
  
  	/*
! 	 * We're a potential sync standby. Release waiters if we are the highest
! 	 * priority standby.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- 	syncWalSnd = SyncRepGetSynchronousStandby();
  
! 	/* We should have found ourselves at least */
! 	Assert(syncWalSnd != NULL);
  
  	/*
! 	 * If we aren't managing the highest priority standby then just leave.
  	 */
! 	if (syncWalSnd != MyWalSnd)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = true;
  		return;
  	}
  
--- 402,439 ----
  		return;
  
  	/*
! 	 * We're a potential sync standby. Release waiters if there are
! 	 * enough sync standbys and we are considered as sync.
  	 */
  	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
  
! 	/*
! 	 * Check whether we are a sync standby or not, and calculate
! 	 * the oldest positions among all sync standbys.
! 	 */
! 	got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
! 											&applyPtr, &am_sync);
  
  	/*
! 	 * If we are managing a sync standby, though we weren't
! 	 * prior to this, then announce we are now a sync standby.
  	 */
! 	if (announce_next_takeover && am_sync)
! 	{
! 		announce_next_takeover = false;
! 		ereport(LOG,
! 				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
! 						application_name, MyWalSnd->sync_standby_priority)));
! 	}
! 
! 	/*
! 	 * If the number of sync standbys is less than requested or we aren't
! 	 * managing a sync standby then just leave.
! 	 */
! 	if (!got_oldest || !am_sync)
  	{
  		LWLockRelease(SyncRepLock);
! 		announce_next_takeover = !am_sync;
  		return;
  	}
  
***************
*** 464,503 **** SyncRepReleaseWaiters(void)
  	 * Set the lsn first so that when we wake backends they will release up to
  	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
  		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
  	}
  
  	LWLockRelease(SyncRepLock);
  
! 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
! 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
! 		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
! 		 numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
  
  	/*
! 	 * If we are managing the highest priority standby, though we weren't
! 	 * prior to this, then announce we are now the sync standby.
  	 */
! 	if (announce_next_takeover)
  	{
! 		announce_next_takeover = false;
! 		ereport(LOG,
! 				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
! 						application_name, MyWalSnd->sync_standby_priority)));
  	}
  }
  
  /*
--- 441,685 ----
  	 * Set the lsn first so that when we wake backends they will release up to
  	 * this location.
  	 */
! 	if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
  		numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
  		numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
  	}
! 	if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
  	{
! 		walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
  		numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
  	}
  
  	LWLockRelease(SyncRepLock);
  
! 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
! 		 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
! 		 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
! 		 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
! }
! 
! /*
!  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
!  *
!  * Return false if the number of sync standbys is less than
!  * synchronous_standby_names specifies. Otherwise return true and
!  * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
!  *
!  * On return, *am_sync is set to true if this walsender is connecting to
!  * sync standby. Otherwise it's set to false.
!  */
! static bool
! SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
! 						   XLogRecPtr *applyPtr, bool *am_sync)
! {
! 	List		*sync_standbys;
! 	ListCell	*cell;
! 
! 	*writePtr = InvalidXLogRecPtr;
! 	*flushPtr = InvalidXLogRecPtr;
! 	*applyPtr = InvalidXLogRecPtr;
! 	*am_sync = false;
! 
! 	/* Get standbys that are considered as synchronous at this moment */
! 	sync_standbys = SyncRepGetSyncStandbys();
! 
! 	/* Quick exit if there are not enough synchronous standbys */
! 	if (list_length(sync_standbys) < SyncRepConfig->num_sync)
! 	{
! 		*am_sync = list_member_int(sync_standbys, MyWalSnd->slotno);
! 		list_free(sync_standbys);
! 		return false;
! 	}
  
  	/*
! 	 * Scan through all sync standbys and calculate the oldest
! 	 * Write, Flush and Apply positions.
  	 */
! 	foreach (cell, sync_standbys)
  	{
! 		WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
! 		XLogRecPtr	write;
! 		XLogRecPtr	flush;
! 		XLogRecPtr	apply;
! 
! 		SpinLockAcquire(&walsnd->mutex);
! 		write = walsnd->write;
! 		flush = walsnd->flush;
! 		apply = walsnd->apply;
! 		SpinLockRelease(&walsnd->mutex);
! 
! 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
! 			*writePtr = write;
! 		if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
! 			*flushPtr = flush;
! 		if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
! 			*applyPtr = apply;
! 		if (walsnd == MyWalSnd)
! 			*am_sync = true;
  	}
+ 
+ 	list_free(sync_standbys);
+ 	return true;
+ }
+ 
+ /*
+  * Return the list of sync standbys, or NIL if no sync standby is connected.
+  *
+  * If there are multiple standbys with the same priority,
+  * the first one found is considered as higher priority.
+  * The caller must hold SyncRepLock.
+  */
+ List *
+ SyncRepGetSyncStandbys(void)
+ {
+ 	List	*result = NIL;
+ 	List	*pending = NIL;
+ 	int	lowest_priority;
+ 	int	next_highest_priority;
+ 	int	this_priority;
+ 	int	priority;
+ 	int	i;
+ 	WalSnd	*walsnd;
+ 
+ 	/* Quick exit if sync replication is not requested */
+ 	if (SyncRepConfig == NULL)
+ 		return NIL;
+ 
+ 	lowest_priority = list_length(SyncRepConfig->members);
+ 	next_highest_priority = lowest_priority + 1;
+ 
+ 	/*
+ 	 * Find the sync standbys which have the highest priority (i.e, 1).
+ 	 * Also store all the other potential sync standbys into the pending list,
+ 	 * in order to scan it later and find other sync standbys from it quickly.
+ 	 */
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		/* Must be active */
+ 		if (walsnd->pid == 0)
+ 			continue;
+ 
+ 		/* Must be streaming */
+ 		if (walsnd->state != WALSNDSTATE_STREAMING)
+ 			continue;
+ 
+ 		/* Must be synchronous */
+ 		this_priority = walsnd->sync_standby_priority;
+ 		if (this_priority == 0)
+ 			continue;
+ 
+ 		/* Must have a valid flush position */
+ 		if (XLogRecPtrIsInvalid(walsnd->flush))
+ 			continue;
+ 
+ 		/*
+ 		 * If the priority is equal to 1, consider this standby as sync
+ 		 * and append it to the result. Otherwise append this standby
+ 		 * to the pending list to check if it's actually sync or not later.
+ 		 */
+ 		if (this_priority == 1)
+ 		{
+ 			result = lappend_int(result, i);
+ 			if (list_length(result) == SyncRepConfig->num_sync)
+ 			{
+ 				list_free(pending);
+ 				return result;		/* Exit if got enough sync standbys */
+ 			}
+ 		}
+ 		else
+ 		{
+ 			pending = lappend_int(pending, i);
+ 
+ 			/*
+ 			 * Track the highest priority among the standbys in the pending
+ 			 * list, in order to use it as the starting priority for later scan
+ 			 * of the list. This is useful to find quickly the sync standbys
+ 			 * from the pending list later because we can skip unnecessary
+ 			 * scans for the unused priorities.
+ 			 */
+ 			if (this_priority < next_highest_priority)
+ 				next_highest_priority = this_priority;
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Consider all pending standbys as sync if the number of them plus
+ 	 * already-found sync ones is lower than the configuration requests.
+ 	 */
+ 	if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
+ 	{
+ 		bool		needfree = (result != NIL && pending != NIL);
+ 		result = list_concat(result, pending);
+ 		if (needfree)
+ 			pfree(pending);
+ 		return result;
+ 	}
+ 
+ 	/*
+ 	 * Find the sync standbys from the pending list.
+ 	 */
+ 	priority = next_highest_priority;
+ 	while (priority <= lowest_priority)
+ 	{
+ 		ListCell	*cell;
+ 		ListCell	*prev = NULL;
+ 		ListCell	*next;
+ 
+ 		next_highest_priority = lowest_priority + 1;
+ 
+ 		for (cell = list_head(pending); cell != NULL; cell = next)
+ 		{
+ 			i = lfirst_int(cell);
+ 			walsnd = &WalSndCtl->walsnds[i];
+ 
+ 			next = lnext(cell);
+ 
+ 			this_priority = walsnd->sync_standby_priority;
+ 			if (this_priority == priority)
+ 			{
+ 				result = lappend_int(result, i);
+ 
+ 				/*
+ 				 * We should always exit here after the scan of pending list
+ 				 * starts because we know that the list has enough elements
+ 				 * to reach SyncRepConfig->num_sync.
+ 				 */
+ 				if (list_length(result) == SyncRepConfig->num_sync)
+ 				{
+ 					list_free(pending);
+ 					return result;		/* Exit if got enough sync standbys */
+ 				}
+ 
+ 				/*
+ 				 * Remove the entry for this sync standby from the list
+ 				 * to prevent us from looking at the same entry again.
+ 				 */
+ 				pending = list_delete_cell(pending, cell, prev);
+ 
+ 				continue;
+ 			}
+ 
+ 			if (this_priority < next_highest_priority)
+ 				next_highest_priority = this_priority;
+ 
+ 			prev = cell;
+ 		}
+ 
+ 		priority = next_highest_priority;
+ 	}
+ 
+ 	/* never reached, but keep compiler quiet */
+ 	Assert(false);
+ 	return result;
  }
  
  /*
***************
*** 511,518 **** SyncRepReleaseWaiters(void)
  static int
  SyncRepGetStandbyPriority(void)
  {
! 	char	   *rawstring;
! 	List	   *elemlist;
  	ListCell   *l;
  	int			priority = 0;
  	bool		found = false;
--- 693,699 ----
  static int
  SyncRepGetStandbyPriority(void)
  {
! 	List	   *members;
  	ListCell   *l;
  	int			priority = 0;
  	bool		found = false;
***************
*** 524,543 **** SyncRepGetStandbyPriority(void)
  	if (am_cascading_walsender)
  		return 0;
  
! 	/* Need a modifiable copy of string */
! 	rawstring = pstrdup(SyncRepStandbyNames);
! 
! 	/* Parse string into list of identifiers */
! 	if (!SplitIdentifierString(rawstring, ',', &elemlist))
! 	{
! 		/* syntax error in list */
! 		pfree(rawstring);
! 		list_free(elemlist);
! 		/* GUC machinery will have already complained - no need to do again */
  		return 0;
- 	}
  
! 	foreach(l, elemlist)
  	{
  		char	   *standby_name = (char *) lfirst(l);
  
--- 705,715 ----
  	if (am_cascading_walsender)
  		return 0;
  
! 	if (!SyncStandbysDefined())
  		return 0;
  
! 	members = SyncRepConfig->members;
! 	foreach(l, members)
  	{
  		char	   *standby_name = (char *) lfirst(l);
  
***************
*** 551,559 **** SyncRepGetStandbyPriority(void)
  		}
  	}
  
- 	pfree(rawstring);
- 	list_free(elemlist);
- 
  	return (found ? priority : 0);
  }
  
--- 723,728 ----
***************
*** 661,666 **** SyncRepUpdateSyncStandbysDefined(void)
--- 830,874 ----
  	}
  }
  
+ /*
+  * Parse synchronous_standby_names and update the config data
+  * of synchronous standbys.
+  */
+ void
+ SyncRepUpdateConfig(void)
+ {
+ 	int	parse_rc;
+ 
+ 	if (!SyncStandbysDefined())
+ 		return;
+ 
+ 	/*
+ 	 * check_synchronous_standby_names() verifies the setting value of
+ 	 * synchronous_standby_names before this function is called. So
+ 	 * syncrep_yyparse() must not cause an error here.
+ 	 */
+ 	syncrep_scanner_init(SyncRepStandbyNames);
+ 	parse_rc = syncrep_yyparse();
+ 	Assert(parse_rc == 0);
+ 	syncrep_scanner_finish();
+ 
+ 	SyncRepConfig = syncrep_parse_result;
+ 	syncrep_parse_result = NULL;
+ }
+ 
+ /*
+  * Free a previously-allocated config data of synchronous replication.
+  */
+ void
+ SyncRepFreeConfig(SyncRepConfigData *config)
+ {
+ 	if (!config)
+ 		return;
+ 
+ 	list_free_deep(config->members);
+ 	pfree(config);
+ }
+ 
  #ifdef USE_ASSERT_CHECKING
  static bool
  SyncRepQueueIsOrderedByLSN(int mode)
***************
*** 705,736 **** SyncRepQueueIsOrderedByLSN(int mode)
  bool
  check_synchronous_standby_names(char **newval, void **extra, GucSource source)
  {
! 	char	   *rawstring;
! 	List	   *elemlist;
! 
! 	/* Need a modifiable copy of string */
! 	rawstring = pstrdup(*newval);
  
! 	/* Parse string into list of identifiers */
! 	if (!SplitIdentifierString(rawstring, ',', &elemlist))
  	{
! 		/* syntax error in list */
! 		GUC_check_errdetail("List syntax is invalid.");
! 		pfree(rawstring);
! 		list_free(elemlist);
! 		return false;
! 	}
  
! 	/*
! 	 * Any additional validation of standby names should go here.
! 	 *
! 	 * Don't attempt to set WALSender priority because this is executed by
! 	 * postmaster at startup, not WALSender, so the application_name is not
! 	 * yet correctly set.
! 	 */
  
! 	pfree(rawstring);
! 	list_free(elemlist);
  
  	return true;
  }
--- 913,941 ----
  bool
  check_synchronous_standby_names(char **newval, void **extra, GucSource source)
  {
! 	int	parse_rc;
  
! 	if (*newval != NULL && (*newval)[0] != '\0')
  	{
! 		syncrep_scanner_init(*newval);
! 		parse_rc = syncrep_yyparse();
! 		syncrep_scanner_finish();
  
! 		if (parse_rc != 0)
! 		{
! 			GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
! 			GUC_check_errdetail("synchronous_standby_names parser returned %d",
! 								parse_rc);
! 			return false;
! 		}
  
! 		/*
! 		 * syncrep_yyparse sets the global syncrep_parse_result as side effect.
! 		 * But this function is required to just check, so frees it
! 		 * once parsing parameter.
! 		 */
! 		SyncRepFreeConfig(syncrep_parse_result);
! 	}
  
  	return true;
  }
*** /dev/null
--- b/src/backend/replication/syncrep_gram.y
***************
*** 0 ****
--- 1,86 ----
+ %{
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep_gram.y				- Parser for synchronous_standby_names
+  *
+  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  src/backend/replication/syncrep_gram.y
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include "replication/syncrep.h"
+ #include "utils/formatting.h"
+ 
+ /* Result of the parsing is returned here */
+ SyncRepConfigData	*syncrep_parse_result;
+ 
+ static SyncRepConfigData *create_syncrep_config(char *num_sync, List *members);
+ 
+ /*
+  * Bison doesn't allocate anything that needs to live across parser calls,
+  * so we can easily have it use palloc instead of malloc.  This prevents
+  * memory leaks if we error out during parsing.  Note this only works with
+  * bison >= 2.0.  However, in bison 1.875 the default is to use alloca()
+  * if possible, so there's not really much problem anyhow, at least if
+  * you're building with gcc.
+  */
+ #define YYMALLOC palloc
+ #define YYFREE   pfree
+ 
+ %}
+ 
+ %expect 0
+ %name-prefix="syncrep_yy"
+ 
+ %union
+ {
+ 	char	   *str;
+ 	List	   *list;
+ 	SyncRepConfigData  *config;
+ }
+ 
+ %token <str> NAME NUM
+ 
+ %type <config> result standby_config
+ %type <list> standby_list
+ %type <str> standby_name
+ 
+ %start result
+ 
+ %%
+ result:
+ 		standby_config				{ syncrep_parse_result = $1; }
+ ;
+ standby_config:
+ 		standby_list				{ $$ = create_syncrep_config("1", $1); }
+ 		| NUM '(' standby_list ')'		{ $$ = create_syncrep_config($1, $3); }
+ ;
+ standby_list:
+ 		standby_name				{ $$ = list_make1($1);}
+ 		| standby_list ',' standby_name		{ $$ = lappend($1, $3);}
+ ;
+ standby_name:
+ 		NAME					{ $$ = $1; }
+ 		| NUM					{ $$ = $1; }
+ ;
+ %%
+ 
+ static SyncRepConfigData *
+ create_syncrep_config(char *num_sync, List *members)
+ {
+ 	SyncRepConfigData *config =
+ 		(SyncRepConfigData *) palloc(sizeof(SyncRepConfigData));
+ 
+ 	config->num_sync = atoi(num_sync);
+ 	config->members = members;
+ 	return config;
+ }
+ 
+ #include "syncrep_scanner.c"
*** /dev/null
--- b/src/backend/replication/syncrep_scanner.l
***************
*** 0 ****
--- 1,144 ----
+ %{
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep_scanner.l
+  *	  a lexical scanner for synchronous_standby_names
+  *
+  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  src/backend/replication/syncrep_scanner.l
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "miscadmin.h"
+ #include "lib/stringinfo.h"
+ 
+ /*
+   * flex emits a yy_fatal_error() function that it calls in response to
+   * critical errors like malloc failure, file I/O errors, and detection of
+   * internal inconsistency.  That function prints a message and calls exit().
+   * Mutate it to instead call ereport(FATAL), which terminates this process.
+   *
+   * The process that causes this fatal error should be terminated.
+   * Otherwise it has to abandon the new setting value of
+   * synchronous_standby_names and keep running with the previous one
+   * while the other processes switch to the new one.
+   * This inconsistency of the setting that each process is based on
+   * can cause a serious problem. Though it's basically not good idea to
+   * use FATAL here because it can take down the postmaster,
+   * we should do that in order to avoid such an inconsistency.
+   */
+ #undef fprintf
+ #define fprintf(file, fmt, msg) syncrep_flex_fatal(fmt, msg)
+ 
+ static void
+ syncrep_flex_fatal(const char *fmt, const char *msg)
+ {
+ 	ereport(FATAL, (errmsg_internal("%s", msg)));
+ }
+ 
+ /* Handles to the buffer that the lexer uses internally */
+ static YY_BUFFER_STATE scanbufhandle;
+ 
+ static StringInfoData xdbuf;
+ 
+ %}
+ 
+ %option 8bit
+ %option never-interactive
+ %option nounput
+ %option noinput
+ %option noyywrap
+ %option warn
+ %option prefix="syncrep_yy"
+ 
+ /*
+  * <xd> delimited identifiers (double-quoted identifiers)
+  */
+ %x xd
+ 
+ space		[ \t\n\r\f\v]
+ 
+ undquoted_start	[^ ,\(\)\"]
+ undquoted_cont		[^ ,\(\)]
+ undquoted_name    {undquoted_start}{undquoted_cont}*
+ dquoted_name		[^\"]+
+ 
+ /* Double-quoted string */
+ dquote		\"
+ xdstart		{dquote}
+ xddouble		{dquote}{dquote}
+ xdstop		{dquote}
+ xdinside		{dquoted_name}
+ 
+ %%
+ {space}+		{ /* ignore */ }
+ {xdstart}	{
+ 				initStringInfo(&xdbuf);
+ 				BEGIN(xd);
+ 		}
+ <xd>{xddouble} {
+ 				appendStringInfoChar(&xdbuf, '\"');
+ 		}
+ <xd>{xdinside} {
+ 				appendStringInfoString(&xdbuf, yytext);
+ 		}
+ <xd>{xdstop} {
+ 				yylval.str = pstrdup(xdbuf.data);
+ 				pfree(xdbuf.data);
+ 				BEGIN(INITIAL);
+ 				return NAME;
+ 		}
+ ","			{ return ','; }
+ "("			{ return '('; }
+ ")"			{ return ')'; }
+ [1-9][0-9]*	{
+ 				yylval.str = pstrdup(yytext);
+ 				return NUM;
+ 		}
+ {undquoted_name} {
+ 				yylval.str = pstrdup(yytext);
+ 				return NAME;
+ 		}
+ %%
+ 
+ void
+ yyerror(const char *message)
+ {
+ 	ereport(IsUnderPostmaster ? DEBUG2 : LOG,
+ 			(errcode(ERRCODE_SYNTAX_ERROR),
+ 			 errmsg("%s at or near \"%s\"", message, yytext)));
+ }
+ 
+ void
+ syncrep_scanner_init(const char *str)
+ {
+ 	Size		slen = strlen(str);
+ 	char	   *scanbuf;
+ 
+ 	/*
+ 	 * Might be left over after ereport()
+ 	 */
+ 	if (YY_CURRENT_BUFFER)
+ 		yy_delete_buffer(YY_CURRENT_BUFFER);
+ 
+ 	/*
+ 	 * Make a scan buffer with special termination needed by flex.
+ 	 */
+ 	scanbuf = (char *) palloc(slen + 2);
+ 	memcpy(scanbuf, str, slen);
+ 	scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
+ 	scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
+ }
+ 
+ void
+ syncrep_scanner_finish(void)
+ {
+ 	yy_delete_buffer(scanbufhandle);
+ 	scanbufhandle = NULL;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 2666,2671 **** WalSndShmemInit(void)
--- 2666,2672 ----
  		{
  			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
  
+ 			walsnd->slotno = i;
  			SpinLockInit(&walsnd->mutex);
  		}
  	}
***************
*** 2751,2757 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	Tuplestorestate *tupstore;
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
! 	WalSnd	   *sync_standby;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
--- 2752,2758 ----
  	Tuplestorestate *tupstore;
  	MemoryContext per_query_ctx;
  	MemoryContext oldcontext;
! 	List	   *sync_standbys;
  	int			i;
  
  	/* check to see if caller supports us returning a tuplestore */
***************
*** 2780,2791 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Get the currently active synchronous standby.
  	 */
  	LWLockAcquire(SyncRepLock, LW_SHARED);
! 	sync_standby = SyncRepGetSynchronousStandby();
  	LWLockRelease(SyncRepLock);
  
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		WalSnd *walsnd = &WalSndCtl->walsnds[i];
--- 2781,2802 ----
  	MemoryContextSwitchTo(oldcontext);
  
  	/*
! 	 * Allocate and update the config data of synchronous replication,
! 	 * and then get the currently active synchronous standbys.
  	 */
+ 	SyncRepUpdateConfig();
  	LWLockAcquire(SyncRepLock, LW_SHARED);
! 	sync_standbys = SyncRepGetSyncStandbys();
  	LWLockRelease(SyncRepLock);
  
+ 	/*
+ 	 * Free the previously-allocated config data because a backend
+ 	 * no longer needs it. The next call of this function needs to
+ 	 * allocate and update the config data newly because the setting
+ 	 * of sync replication might be changed between the calls.
+ 	 */
+ 	SyncRepFreeConfig(SyncRepConfig);
+ 
  	for (i = 0; i < max_wal_senders; i++)
  	{
  		WalSnd *walsnd = &WalSndCtl->walsnds[i];
***************
*** 2856,2862 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
! 			else if (walsnd == sync_standby)
  				values[7] = CStringGetTextDatum("sync");
  			else
  				values[7] = CStringGetTextDatum("potential");
--- 2867,2873 ----
  			 */
  			if (priority == 0)
  				values[7] = CStringGetTextDatum("async");
! 			else if (list_member_int(sync_standbys, i))
  				values[7] = CStringGetTextDatum("sync");
  			else
  				values[7] = CStringGetTextDatum("potential");
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 240,246 ****
  # These settings are ignored on a standby server.
  
  #synchronous_standby_names = ''	# standby servers that provide sync rep
! 				# comma-separated list of application_name
  				# from standby(s); '*' = all
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
--- 240,246 ----
  # These settings are ignored on a standby server.
  
  #synchronous_standby_names = ''	# standby servers that provide sync rep
! 				# number of sync standbys and comma-separated list of application_name
  				# from standby(s); '*' = all
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
  
*** a/src/include/replication/syncrep.h
--- b/src/include/replication/syncrep.h
***************
*** 32,37 ****
--- 32,49 ----
  #define SYNC_REP_WAITING			1
  #define SYNC_REP_WAIT_COMPLETE		2
  
+ /*
+  * Struct for the configuration of synchronous replication.
+  */
+ typedef struct SyncRepConfigData
+ {
+ 	int	num_sync;	/* number of sync standbys that we need to wait for */
+ 	List	*members;	/* list of names of potential sync standbys */
+ } SyncRepConfigData;
+ 
+ extern SyncRepConfigData *syncrep_parse_result;
+ extern SyncRepConfigData *SyncRepConfig;
+ 
  /* user-settable parameters for synchronous replication */
  extern char *SyncRepStandbyNames;
  
***************
*** 45,58 **** extern void SyncRepCleanupAtProcExit(void);
  extern void SyncRepInitConfig(void);
  extern void SyncRepReleaseWaiters(void);
  
  /* called by checkpointer */
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
- /* forward declaration to avoid pulling in walsender_private.h */
- struct WalSnd;
- extern struct WalSnd *SyncRepGetSynchronousStandby(void);
- 
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
  
  #endif   /* _SYNCREP_H */
--- 57,81 ----
  extern void SyncRepInitConfig(void);
  extern void SyncRepReleaseWaiters(void);
  
+ /* called by wal sender and user backend */
+ extern List *SyncRepGetSyncStandbys(void);
+ extern void SyncRepUpdateConfig(void);
+ extern void SyncRepFreeConfig(SyncRepConfigData *config);
+ 
  /* called by checkpointer */
  extern void SyncRepUpdateSyncStandbysDefined(void);
  
  extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
  extern void assign_synchronous_commit(int newval, void *extra);
  
+ /*
+  * Internal functions for parsing synchronous_standby_names grammar,
+  * in syncrep_gram.y and syncrep_scanner.l
+  */
+ extern int  syncrep_yyparse(void);
+ extern int  syncrep_yylex(void);
+ extern void syncrep_yyerror(const char *str);
+ extern void syncrep_scanner_init(const char *query_string);
+ extern void syncrep_scanner_finish(void);
+ 
  #endif   /* _SYNCREP_H */
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 32,37 **** typedef enum WalSndState
--- 32,38 ----
   */
  typedef struct WalSnd
  {
+ 	int		slotno;			/* index of this slot in WalSnd array */
  	pid_t		pid;			/* this walsender's process id, or 0 */
  	WalSndState state;			/* this walsender's state */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
*** a/src/tools/msvc/Mkvcbuild.pm
--- b/src/tools/msvc/Mkvcbuild.pm
***************
*** 156,162 **** sub mkvcbuild
  		'bootparse.y');
  	$postgres->AddFiles('src/backend/utils/misc', 'guc-file.l');
  	$postgres->AddFiles('src/backend/replication', 'repl_scanner.l',
! 		'repl_gram.y');
  	$postgres->AddDefine('BUILDING_DLL');
  	$postgres->AddLibrary('secur32.lib');
  	$postgres->AddLibrary('ws2_32.lib');
--- 156,162 ----
  		'bootparse.y');
  	$postgres->AddFiles('src/backend/utils/misc', 'guc-file.l');
  	$postgres->AddFiles('src/backend/replication', 'repl_scanner.l',
! 		'repl_gram.y', 'syncrep_scanner.l', 'syncrep_gram.y');
  	$postgres->AddDefine('BUILDING_DLL');
  	$postgres->AddLibrary('secur32.lib');
  	$postgres->AddLibrary('ws2_32.lib');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to