Hello, hackers

I have implemented a function caching query on memcached.
I am sending the patch file and it's so nice to give me some opinions.

Main program which I implemented is as below.
- Search for cache data on memcached by using md5 which converted SELECT
into.
- Set data on memcached. The key is md5 which converted  SELECT into.
The value is RowDescription, DataRow from Backend.

[Modify files]
- pool.h
- child.c
- pool_config.l
- pool_config.h
- pool_config.c
- pool_system.c
- pool_proto_modules.c
- pool_process_query.c

[New File]
- pool_memqcache.c


--
Regards
Masanori YAMAZAKI
Index: pool.h
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool.h,v
retrieving revision 1.90
diff -c -r1.90 pool.h
*** pool.h	2 May 2011 13:31:25 -0000	1.90
--- pool.h	14 Jun 2011 13:11:20 -0000
***************
*** 37,42 ****
--- 37,44 ----
  #include <sys/types.h>
  #include <limits.h>
  
+ #include <libmemcached/memcached.h>
+ 
  #ifdef USE_SSL
  #include <openssl/crypto.h>
  #include <openssl/ssl.h>
***************
*** 223,228 ****
--- 225,236 ----
  } POOL_CONNECTION_POOL;
  
  typedef struct {
+ 	memcached_server_st *servers;
+ 	memcached_st *memc;
+ 	memcached_return rc;
+ } POOL_MEMCACHED_CONNECTION;
+ 
+ typedef struct {
  	SystemDBInfo *info;
  	PGconn *pgconn;
  	/* persistent connection to the system DB */
Index: child.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/child.c,v
retrieving revision 1.72
diff -c -r1.72 child.c
*** child.c	9 May 2011 23:37:14 -0000	1.72
--- child.c	14 Jun 2011 13:11:21 -0000
***************
*** 149,154 ****
--- 149,157 ----
  	/* initialize system db connection */
  	init_system_db_connection();
  
+ 	/* initialize Memcached connection */
+ 	init_memcached_connection();
+ 
  	/* initialize connection pool */
  	if (pool_init_cp())
  	{
***************
*** 1957,1962 ****
--- 1960,1980 ----
  }
  
  /*
+  * Initialize Memcached connection
+  */
+ static void init_memcached_connection(void)
+ {
+ 	if (pool_config->memory_cache_enabled)
+ 	{
+ 		memcached_connect();
+ 		if (memcached_con->rc != MEMCACHED_SUCCESS)
+ 		{
+ 			pool_error("Could not connect the Memcached");
+ 		}
+ 	}
+ }
+ 
+ /*
   * Initialize my backend status.
   * We copy the backend status to private area so that
   * they are not changed while I am alive.
Index: pool_config.l
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.l,v
retrieving revision 1.56
diff -c -r1.56 pool_config.l
*** pool_config.l	6 May 2011 23:43:26 -0000	1.56
--- pool_config.l	14 Jun 2011 13:11:21 -0000
***************
*** 43,48 ****
--- 43,50 ----
  static char *default_reset_query_list[] = {"ABORT", "DISCARD ALL"};
  static char *default_black_function_list[] = {"nextval", "setval"};
  
+ POOL_MEMCACHED_CONNECTION *memcached_con;
+ 
  typedef enum {
    POOL_KEY = 1,
    POOL_INTEGER,
***************
*** 211,216 ****
--- 213,227 ----
  	pool_config->pattc = 0;
  	pool_config->current_pattern_size = 0;
  
+     pool_config->memory_cache_enabled = "off";
+     pool_config->memqcache_method = "shmem";
+     pool_config->memqcache_memcached_host = "";
+     pool_config->memqcache_memcached_port = "";
+     pool_config->memqcache_total_size = "";
+     pool_config->memqcache_expire = "60";
+     pool_config->memqcache_maxcache = "";
+     pool_config->memqcache_cache_block_size = "8192";
+ 
  	res = gethostname(localhostname,sizeof(localhostname));
  	if(res !=0 )
  	{
***************
*** 1716,1721 ****
--- 1727,1841 ----
  			}
  			pool_config->relcache_expire = v;
  		}
+         else if (!strcmp(key, "memory_cache_enabled") &&
+                  CHECK_CONTEXT(INIT_CONFIG|RELOAD_CONFIG, context))
+         {
+             int v = eval_logical(yytext);
+ 
+             if (v < 0)
+             {
+                 pool_error("pool_config: invalid value %s for %s", yytext, key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memory_cache_enabled = v;
+         }
+         else if (!strcmp(key, "memqcache_method") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             char *str;
+ 
+             if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+             {
+                 PARSE_ERROR();
+                 fclose(fd);
+                 return(-1);
+             }
+             str = extract_string(yytext, token);
+             if (str == NULL)
+             {
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_method = str;
+         }
+         else if (!strcmp(key, "memqcache_memcached_host") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             char *str;
+ 
+             if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+             {
+                 PARSE_ERROR();
+                 fclose(fd);
+                 return(-1);
+             }
+             str = extract_string(yytext, token);
+             if (str == NULL)
+             {
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_memcached_host = str;
+         }
+         else if (!strcmp(key, "memqcache_memcached_port") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             int v = atoi(yytext);
+ 
+             if (token != POOL_INTEGER || v < 0)
+             {
+                 pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_memcached_port = v;
+         }
+         else if (!strcmp(key, "memqcache_total_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             int v = atoi(yytext);
+ 
+             if (token != POOL_INTEGER || v < 0)
+             {
+                 pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_total_size = v;
+         }
+         else if (!strcmp(key, "memqcache_expire") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             int v = atoi(yytext);
+ 
+             if (token != POOL_INTEGER || v < 0)
+             {
+                 pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_expire = v;
+         }
+         else if (!strcmp(key, "memqcache_maxcache") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             int v = atoi(yytext);
+ 
+             if (token != POOL_INTEGER || v < 0)
+             {
+                 pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_maxcache = v;
+         }
+         else if (!strcmp(key, "memqcache_cache_block_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+         {
+             int v = atoi(yytext);
+ 
+             if (token != POOL_INTEGER || v < 0)
+             {
+                 pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+                 fclose(fd);
+                 return(-1);
+             }
+             pool_config->memqcache_cache_block_size = v;
+         }
  
  	}
  
Index: pool_config.h
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.h,v
retrieving revision 1.13
diff -c -r1.13 pool_config.h
*** pool_config.h	30 Mar 2011 02:13:07 -0000	1.13
--- pool_config.h	14 Jun 2011 13:11:21 -0000
***************
*** 174,179 ****
--- 174,188 ----
  	RegPattern *lists_patterns; /* Precompiled regex patterns for black/white lists */
  	int pattc; /* number of regexp pattern */
  	int current_pattern_size; /* size of the regex pattern array */
+ 
+ 	int *memory_cache_enabled;   /* if true, use the memory cache functionality, false by default */
+ 	char *memqcache_method;   /* Cache store method. Either 'shmem'(shared memory) or 'memcached'. 'shmem' by default */
+ 	char *memqcache_memcached_host;   /* Memcached host name. Mandatory if memqcache_method=memcached. */
+ 	int memqcache_memcached_port;   /* Memcached port number. Mondatory if memqcache_method=memcached. */
+ 	int memqcache_total_size;   /* Total memory size in bytes for storing memory cache. Mandatory if memqcache_method=shmem. */
+ 	int memqcache_expire;   /* Memory cache entry life time specified in seconds. 60 by default. */
+ 	int memqcache_maxcache;   /* Maximum SELECT result size in bytes. */
+ 	int memqcache_cache_block_size;   /* Cache block size in bytes. 8192 by default */
  } POOL_CONFIG;
  
  typedef enum {
Index: pool_config.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.c,v
retrieving revision 1.60
diff -c -r1.60 pool_config.c
*** pool_config.c	6 May 2011 23:43:26 -0000	1.60
--- pool_config.c	14 Jun 2011 13:11:22 -0000
***************
*** 526,531 ****
--- 526,533 ----
  static char *default_reset_query_list[] = {"ABORT", "DISCARD ALL"};
  static char *default_black_function_list[] = {"nextval", "setval"};
  
+ POOL_MEMCACHED_CONNECTION *memcached_con;
+ 
  typedef enum {
    POOL_KEY = 1,
    POOL_INTEGER,
***************
*** 1936,1941 ****
--- 1938,1952 ----
  	pool_config->pattc = 0;
  	pool_config->current_pattern_size = 0;
  
+ 	pool_config->memory_cache_enabled = "off";
+ 	pool_config->memqcache_method = "shmem";
+ 	pool_config->memqcache_memcached_host = "";
+ 	pool_config->memqcache_memcached_port = "";
+ 	pool_config->memqcache_total_size = "";
+ 	pool_config->memqcache_expire = "60";
+ 	pool_config->memqcache_maxcache = "";
+ 	pool_config->memqcache_cache_block_size = "8192";
+ 
  	res = gethostname(localhostname,sizeof(localhostname));
  	if(res !=0 )
  	{
***************
*** 3441,3446 ****
--- 3452,3566 ----
  			}
  			pool_config->relcache_expire = v;
  		}
+ 		else if (!strcmp(key, "memory_cache_enabled") &&
+ 				 CHECK_CONTEXT(INIT_CONFIG|RELOAD_CONFIG, context))
+ 		{
+ 			int v = eval_logical(yytext);
+ 
+ 			if (v < 0)
+ 			{
+ 				pool_error("pool_config: invalid value %s for %s", yytext, key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memory_cache_enabled = v;
+ 		}
+ 		else if (!strcmp(key, "memqcache_method") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			char *str;
+ 
+ 			if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ 			{
+ 				PARSE_ERROR();
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			str = extract_string(yytext, token);
+ 			if (str == NULL)
+ 			{
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_method = str;
+ 		}
+ 		else if (!strcmp(key, "memqcache_memcached_host") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			char *str;
+ 
+ 			if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ 			{
+ 				PARSE_ERROR();
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			str = extract_string(yytext, token);
+ 			if (str == NULL)
+ 			{
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_memcached_host = str;
+ 		}
+ 		else if (!strcmp(key, "memqcache_memcached_port") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			int v = atoi(yytext);
+ 
+ 			if (token != POOL_INTEGER || v < 0)
+ 			{
+ 				pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_memcached_port = v;
+ 		}
+ 		else if (!strcmp(key, "memqcache_total_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			int v = atoi(yytext);
+ 
+ 			if (token != POOL_INTEGER || v < 0)
+ 			{
+ 				pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_total_size = v;
+ 		}
+ 		else if (!strcmp(key, "memqcache_expire") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			int v = atoi(yytext);
+ 
+ 			if (token != POOL_INTEGER || v < 0)
+ 			{
+ 				pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_expire = v;
+ 		}
+ 		else if (!strcmp(key, "memqcache_maxcache") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			int v = atoi(yytext);
+ 
+ 			if (token != POOL_INTEGER || v < 0)
+ 			{
+ 				pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_maxcache = v;
+ 		}
+ 		else if (!strcmp(key, "memqcache_cache_block_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ 		{
+ 			int v = atoi(yytext);
+ 
+ 			if (token != POOL_INTEGER || v < 0)
+ 			{
+ 				pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ 				fclose(fd);
+ 				return(-1);
+ 			}
+ 			pool_config->memqcache_cache_block_size = v;
+ 		}
  
  	}
  
Index: pool_system.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_system.c,v
retrieving revision 1.7
diff -c -r1.7 pool_system.c
*** pool_system.c	1 Jun 2010 09:03:00 -0000	1.7
--- pool_system.c	14 Jun 2011 13:11:22 -0000
***************
*** 179,184 ****
--- 179,208 ----
  }
  
  /*
+  * memcached_connect:
+  *     Connects memcached.
+  */
+ int memcached_connect (void)
+ {
+ 	memcached_con->memc = memcached_create(NULL);
+ 	memcached_con->servers = memcached_server_list_append(NULL,
+                                                           pool_config->memqcache_memcached_host,
+                                                           pool_config->memqcache_memcached_port,
+                                                           &memcached_con->rc);
+ 
+ 	memcached_con->rc = memcached_server_push(memcached_con->memc, memcached_con->servers);
+ 	if (memcached_con->rc != MEMCACHED_SUCCESS)
+ 	{
+ 		pool_error("memcached_connect: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
+ 		return 1;
+ 	}
+ 
+ 	memcached_server_list_free(memcached_con->servers);
+ 
+ 	return 0;
+ }
+ 
+ /*
   * pool_memset_system_db_info:
   *    Initializes distribution rules. Distribution rules are stored in
   *    System DB. So we have to execute query, and expand results on
Index: pool_proto_modules.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v
retrieving revision 1.98
diff -c -r1.98 pool_proto_modules.c
*** pool_proto_modules.c	5 Jun 2011 23:03:06 -0000	1.98
--- pool_proto_modules.c	14 Jun 2011 13:11:22 -0000
***************
*** 191,196 ****
--- 191,209 ----
  			}
  		}
  
+ 		if (pool_config->memory_cache_enabled
+ 				&& IsA(node, SelectStmt)
+ 				&& !(is_select_pgcatalog = IsSelectpgcatalog(node, backend)))
+ 		{
+ 			/* Look up cache data on memory */
+ 			if (execute_memqcache_lookup(frontend, backend, node) == POOL_CONTINUE)
+ 			{
+ 				pool_query_context_destroy(query_context);
+ 				pool_set_skip_reading_from_backends();
+ 				return POOL_CONTINUE;
+ 			}
+ 		}
+ 
  		/*
  		 * Start query context
  		 */
***************
*** 2117,2123 ****
--- 2130,2161 ----
  				break;
  
  			case 'Z':	/* ReadyForQuery */
+ 				/* if no Memcached connected, retry to connect it */
+ 				if (memcached_con->rc != MEMCACHED_SUCCESS)
+ 				{
+ 					memcached_connect();
+ 					if (memcached_con->rc != MEMCACHED_SUCCESS)
+ 					{
+ 						pool_error("Could not connect the Memcached");
+ 					}
+ 					else
+ 					{
+ 						/* convert SELECT query to md5 hash */
+ 						pool_md5_hash(frontend->buf2, strlen(frontend->buf2), md5_query);
+ 						/* set cache data. key md5 hash SELECT, value RowDescription, DataRow.. */
+ 						set_cache_on_memcached(frontend, md5_query, get_buf());
+ 					}
+ 				}
+ 				else
+ 				{
+ 					/* convert SELECT query to md5 hash */
+ 					pool_md5_hash(frontend->buf2, strlen(frontend->buf2), md5_query);
+ 					/* set cache data. key md5 hash SELECT, value RowDescription, DataRow.. */
+ 					set_cache_on_memcached(frontend, md5_query, get_buf());
+ 				}
  				status = ReadyForQuery(frontend, backend, 1);
+ 				/* initialize buf for caching on memory */
+ 				init_buf();
  				break;
  
  			case '1':	/* ParseComplete */
Index: pool_process_query.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v
retrieving revision 1.259
diff -c -r1.259 pool_process_query.c
*** pool_process_query.c	13 May 2011 06:53:05 -0000	1.259
--- pool_process_query.c	14 Jun 2011 13:11:23 -0000
***************
*** 107,112 ****
--- 107,115 ----
  	qcnt = 0;
  	state = 0;
  
+ 	/* initialize buf for caching on memory */
+ 	init_buf();
+ 
  	for (;;)
  	{
  		/* Are we requested to send reset queries? */
***************
*** 1163,1168 ****
--- 1166,1177 ----
  		query_cache_register(kind, frontend, backend->info->database, p1, len1);
  	}
  
+ 	/* save the received result on memory for each kind */
+ 	if (pool_config->memory_cache_enabled)
+ 	{
+ 		memqcache_register(kind, frontend, backend->info->database, p1, len1);
+ 	}
+ 
  	/* error response? */
  	if (kind == 'E')
  	{
/* -*-pgsql-c-*- */
/*
 * pgpool: a language independent connection pool server for PostgreSQL
 * written by Tatsuo Ishii
 *
 * Copyright (c) 2003-2010	PgPool Global Development Group
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby
 * granted, provided that the above copyright notice appear in all
 * copies and that both that copyright notice and this permission
 * notice appear in supporting documentation, and that the name of the
 * author not be used in advertising or publicity pertaining to
 * distribution of the software without specific, written prior
 * permission. The author makes no representations about the
 * suitability of this software for any purpose.  It is provided "as
 * is" without express or implied warranty.
 *
 * pool_memqcache.c: query cache on shmem or memcached
 *
 */

#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

#include "pool.h"
#include "md5.h"
#include "pool_stream.h"
#include "pool_config.h"
#include "pool_proto_modules.h"
#include "parser/parsenodes.h"

typedef enum
{
	CACHE_FOUND, CACHE_NOT_FOUND, CACHE_ERROR
} CACHE_STATUS;

static size_t buflen = 0;
static size_t bufsize = 0;
static char *buf = NULL;

/* --------------------------------
 * memqcache_lookup() - lookup cache data on memory
 * returns POOL_CONTINUE, POOL_END, POOL_ERROR
`* --------------------------------
 */
POOL_STATUS memqcache_lookup(POOL_CONNECTION *frontend,
                             POOL_CONNECTION_POOL *backend,
                             Node *node)
{
    SelectStmt *select = (SelectStmt *)node;
    POOL_STATUS status = POOL_END;	/* cache not found */

    if (! (select->intoClause || select->lockingClause))
    {
        parsed_query = strdup(nodeToString(node));
        if (parsed_query == NULL)
        {
            pool_error("execute_memqcache_lookup: malloc failed");
            return POOL_ERROR;
        }

        /* if no Memcached connected, retry to connect it */
        if (memcached_con->rc != MEMCACHED_SUCCESS)
        {
        	memcached_connect();
        	if (memcached_con->rc != MEMCACHED_SUCCESS)
        	{
        		pool_error("Could not connect the Memcached");
        	}
        	else
        	{
        		/* convert SELECT query to md5 hash */
        		pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
        		/* look up SELECT query string */
        		status = get_cache_on_memcached(frontend, md5_query, buf, len);
        	}
        }
        else
        {
    		/* convert SELECT query to md5 hash */
    		pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
    		/* look up SELECT query string */
    		status = get_cache_on_memcached(frontend, md5_query, buf, len);
        }

        /* cache found */
        if (status == CACHE_FOUND)
        {
        	/* free */
        	free(parsed_query);
        	parsed_query = NULL;
        	free_parser();
        	status = POOL_CONTINUE;
        }
        /* cache not found */
        else if (status == CACHE_ERROR)
        {
            pool_error("pool_memqcache_lookup: memqcache lookup failed");
            status = POOL_ERROR;
        }
       else
       {
    	    pool_debug("pool_memqcache_lookup: memqcache not found");
    	    status = POOL_END;
        }
    }
    return status;
}

/* --------------------------------
 * set_cache_on_memcached() - set cache data on memcached
 * returns 1 on success, 0 otherwise
`* --------------------------------
 */
int set_cache_on_memcached(POOL_CONNECTION *frontend,
                           const char *md5_query,
                           const char *data)
{
	/* memory_cache_enabled "on", and method memcached */
    if (pool_config->memory_cache_enabled && pool_config->memqcache_method == "memcached")
    {
        return 0;
    }

	/* if no SELECT query string  */
    if (strlen(query) <= 0)
    {
        return 0;
    }

    pool_debug("set_cache_on_memcached: md5_query=%s ", md5_query);

	/* set cache data to memcached. key is md5 hash query */
    memcached_con->rc = memcached_set(memcached_con->memc, md5_query, strlen(md5_query), data, strlen(data), pool_config->memqcache_expire, 0);

	/* setting cache data on memcached is failed */
    if (memcached_con->rc != MEMCACHED_SUCCESS && memcached_con->rc != MEMCACHED_BUFFERED)
    {
        pool_error("set_cache_on_memcached: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
        return 0;
    }

    pool_debug("set_cache_on_memcached: succeeded.");

    return 1;
}

/* --------------------------------
 * get_cache_on_memcached() - get cache data on memcached
 * returns CACHE_FOUND on success, CACHE_ERROR otherwise
`* --------------------------------
 */
int get_cache_on_memcached(POOL_CONNECTION *frontend,
                           const char *md5_query,
                           char **buf,
                           size_t *len)
{
    uint32_t flags;
    char *ptr;

    /* memory_cache_enabled "on", and method memcached */
    if (pool_config->memory_cache_enabled && pool_config->memqcache_method == "memcached")
    {
        return CACHE_ERROR;
    }

	/* if no SELECT query string  */
    if (strlen(query) <= 0)
    {
        return CACHE_ERROR;
    }

	/* get cache data on memcached. key is md5 hash query */
    ptr = memcached_get(memcached_con->memc, md5_query, strlen(md5_query), len, &flags, &memcached_con->rc);

	/* getting cache data on memcached is failed */
    if (memcached_con->rc != MEMCACHED_SUCCESS)
    {
        pool_error("get_cache_on_memcached: %s", memcached_strerror(memcached_con->memc, memcached_con->rc));
        return CACHE_ERROR;
    }

    /* copy buf */
    memcpy(buf, ptr, *len);
    /* free */
    free(ptr);

    pool_debug("get_cache_on_memcached: md5_query=%s", md5_query);

    return CACHE_FOUND;
}

/* --------------------------------
 * delete_cache_on_memcached() - delete cache data on memcached
 * returns 1 on success, 0 otherwise
`* --------------------------------
 */
int delete_cache_on_memcached(const char *key)
{
	/* delete cache data on memcached. key is md5 hash query */
    memcached_con->rc= memcached_delete(memcached_con->memc, key, strlen(key), (time_t)0);

	/* delete cache data on memcached is failed */
    if (memcached_con->rc != MEMCACHED_SUCCESS && memcached_con->rc == MEMCACHED_BUFFERED) {
        pool_error ("delete_cache_on_memcached: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
        /* free */
        memcached_free(memcached_con->memc);
        return 0;
    }

    /* free */
    memcached_free(memcached_con->memc);
    return 1;
}

/* --------------------------------
 * memqcache_register() - delete cache data on memcached
`* --------------------------------
 */
void memqcache_register(char kind,
                        POOL_CONNECTION *frontend,
                        char *database,
                        char *data,
                        int data_len)
{
	int send_len;
	int result = 0;

    if (is_select_pgcatalog || is_select_for_update)
    {
		return;
    }

    if (kind == 'T' && parsed_query)
    {
        pool_debug("execute_memqcache_register: saving cache for query: \"%s\"", query);

        /* store data into the cache */
        add_buf(&kind, 1);
        send_len = htonl(data_len + sizeof(int));
        add_buf(&send_len, sizeof(int));
        add_buf(data, data_len);

        /* if no Memcached connected, retry to connect it */
        if (memcached_con->rc != MEMCACHED_SUCCESS)
        {
        	memcached_connect();
        	if (memcached_con->rc != MEMCACHED_SUCCESS)
        	{
        		pool_error("Could not connect the Memcached");
        	}
        	else
        	{
        		/* convert SELECT query to md5 hash */
        		pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
        		/* set tableOID from RowDescription and md5_query. key tableOID, value md5 hash */
        		result = set_cache_on_memcached(frontend, buf, md5_query);
        	}
        }
        else
        {
    		/* convert SELECT query to md5 hash */
    		pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
    		/* set tableOID from RowDescription and md5_query. key tableOID, value md5 hash */
    		result = set_cache_on_memcached(frontend, buf, md5_query);
        }

        if (result < 0)
        {
        	pool_error("memqcache_register: query cache registration failed");
        	return;
        }
    }
    else if (kind == 'D' || kind == 'C')
    {
    	/* store data into the cache */
		add_buf(&kind, 1);
		send_len = htonl(data_len + sizeof(int));
		add_buf(&send_len, sizeof(int));
		add_buf(data, data_len);

		pool_debug("memqcache_register: query cache saved");

		/* free */
		free(parsed_query);
		parsed_query = NULL;
	}

    return;
}

/* --------------------------------
 * add_buf() - add the data from Backend such as RowDescription, DataRow.. to buf
 * returns buf length on success, 0 otherwise
`* --------------------------------
 */
int add_buf(const char *s, size_t len)
{
    int pos=0;

    /* no memory_cache_enabled "on" */
    if (pool_config->memory_cache_enabled != "on")
    {
        return 0;
    }

    if (buflen + len >= bufsize) {
        size_t newbufsize = (buflen + len) + pool_config->memqcache_cache_block_size;
        pool_debug("add_buf: realloc buf bufsize=%d newbufsize=%d", bufsize, newbufsize);
        bufsize = newbufsize;
        buf = (char *)realloc(buf, bufsize);
    }

    /* add the data from Backend to buf one by one */
    for (pos=0 ; pos<len ; pos++)
    {
        buf[buflen+pos] = s[pos];
    }
    buflen += pos;

    pool_debug("add_buf: len=%d, total=%d bufsize=%d", len, buflen, bufsize);

    return buflen;
}

/* --------------------------------
 * get_buf() - get buf for caching on memory
 * returns buf
`* --------------------------------
 */
char* get_buf(void)
{
	return buf;
}

/* --------------------------------
 * init_buf() - initialize buf for caching on memory
`* --------------------------------
 */
void init_buf(void)
{
	buflen = 0;
	if (buf == NULL)
	{
		bufsize = pool_config->memqcache_cache_block_size;
		buf = (char *)malloc(bufsize);
	}
	memset(buf, 0, bufsize);
}
_______________________________________________
Pgpool-hackers mailing list
[email protected]
http://pgfoundry.org/mailman/listinfo/pgpool-hackers

Reply via email to