Hello, hackers
I have implemented a function caching query on memcached.
I am sending the patch file and it's so nice to give me some opinions.
Main program which I implemented is as below.
- Search for cache data on memcached by using md5 which converted SELECT
into.
- Set data on memcached. The key is md5 which converted SELECT into.
The value is RowDescription, DataRow from Backend.
[Modify files]
- pool.h
- child.c
- pool_config.l
- pool_config.h
- pool_config.c
- pool_system.c
- pool_proto_modules.c
- pool_process_query.c
[New File]
- pool_memqcache.c
--
Regards
Masanori YAMAZAKI
Index: pool.h
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool.h,v
retrieving revision 1.90
diff -c -r1.90 pool.h
*** pool.h 2 May 2011 13:31:25 -0000 1.90
--- pool.h 14 Jun 2011 13:11:20 -0000
***************
*** 37,42 ****
--- 37,44 ----
#include <sys/types.h>
#include <limits.h>
+ #include <libmemcached/memcached.h>
+
#ifdef USE_SSL
#include <openssl/crypto.h>
#include <openssl/ssl.h>
***************
*** 223,228 ****
--- 225,236 ----
} POOL_CONNECTION_POOL;
typedef struct {
+ memcached_server_st *servers;
+ memcached_st *memc;
+ memcached_return rc;
+ } POOL_MEMCACHED_CONNECTION;
+
+ typedef struct {
SystemDBInfo *info;
PGconn *pgconn;
/* persistent connection to the system DB */
Index: child.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/child.c,v
retrieving revision 1.72
diff -c -r1.72 child.c
*** child.c 9 May 2011 23:37:14 -0000 1.72
--- child.c 14 Jun 2011 13:11:21 -0000
***************
*** 149,154 ****
--- 149,157 ----
/* initialize system db connection */
init_system_db_connection();
+ /* initialize Memcached connection */
+ init_memcached_connection();
+
/* initialize connection pool */
if (pool_init_cp())
{
***************
*** 1957,1962 ****
--- 1960,1980 ----
}
/*
+ * Initialize Memcached connection
+ */
+ static void init_memcached_connection(void)
+ {
+ if (pool_config->memory_cache_enabled)
+ {
+ memcached_connect();
+ if (memcached_con->rc != MEMCACHED_SUCCESS)
+ {
+ pool_error("Could not connect the Memcached");
+ }
+ }
+ }
+
+ /*
* Initialize my backend status.
* We copy the backend status to private area so that
* they are not changed while I am alive.
Index: pool_config.l
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.l,v
retrieving revision 1.56
diff -c -r1.56 pool_config.l
*** pool_config.l 6 May 2011 23:43:26 -0000 1.56
--- pool_config.l 14 Jun 2011 13:11:21 -0000
***************
*** 43,48 ****
--- 43,50 ----
static char *default_reset_query_list[] = {"ABORT", "DISCARD ALL"};
static char *default_black_function_list[] = {"nextval", "setval"};
+ POOL_MEMCACHED_CONNECTION *memcached_con;
+
typedef enum {
POOL_KEY = 1,
POOL_INTEGER,
***************
*** 211,216 ****
--- 213,227 ----
pool_config->pattc = 0;
pool_config->current_pattern_size = 0;
+ pool_config->memory_cache_enabled = "off";
+ pool_config->memqcache_method = "shmem";
+ pool_config->memqcache_memcached_host = "";
+ pool_config->memqcache_memcached_port = "";
+ pool_config->memqcache_total_size = "";
+ pool_config->memqcache_expire = "60";
+ pool_config->memqcache_maxcache = "";
+ pool_config->memqcache_cache_block_size = "8192";
+
res = gethostname(localhostname,sizeof(localhostname));
if(res !=0 )
{
***************
*** 1716,1721 ****
--- 1727,1841 ----
}
pool_config->relcache_expire = v;
}
+ else if (!strcmp(key, "memory_cache_enabled") &&
+ CHECK_CONTEXT(INIT_CONFIG|RELOAD_CONFIG, context))
+ {
+ int v = eval_logical(yytext);
+
+ if (v < 0)
+ {
+ pool_error("pool_config: invalid value %s for %s", yytext, key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memory_cache_enabled = v;
+ }
+ else if (!strcmp(key, "memqcache_method") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ char *str;
+
+ if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ {
+ PARSE_ERROR();
+ fclose(fd);
+ return(-1);
+ }
+ str = extract_string(yytext, token);
+ if (str == NULL)
+ {
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_method = str;
+ }
+ else if (!strcmp(key, "memqcache_memcached_host") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ char *str;
+
+ if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ {
+ PARSE_ERROR();
+ fclose(fd);
+ return(-1);
+ }
+ str = extract_string(yytext, token);
+ if (str == NULL)
+ {
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_memcached_host = str;
+ }
+ else if (!strcmp(key, "memqcache_memcached_port") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_memcached_port = v;
+ }
+ else if (!strcmp(key, "memqcache_total_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_total_size = v;
+ }
+ else if (!strcmp(key, "memqcache_expire") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_expire = v;
+ }
+ else if (!strcmp(key, "memqcache_maxcache") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_maxcache = v;
+ }
+ else if (!strcmp(key, "memqcache_cache_block_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_cache_block_size = v;
+ }
}
Index: pool_config.h
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.h,v
retrieving revision 1.13
diff -c -r1.13 pool_config.h
*** pool_config.h 30 Mar 2011 02:13:07 -0000 1.13
--- pool_config.h 14 Jun 2011 13:11:21 -0000
***************
*** 174,179 ****
--- 174,188 ----
RegPattern *lists_patterns; /* Precompiled regex patterns for black/white lists */
int pattc; /* number of regexp pattern */
int current_pattern_size; /* size of the regex pattern array */
+
+ int *memory_cache_enabled; /* if true, use the memory cache functionality, false by default */
+ char *memqcache_method; /* Cache store method. Either 'shmem'(shared memory) or 'memcached'. 'shmem' by default */
+ char *memqcache_memcached_host; /* Memcached host name. Mandatory if memqcache_method=memcached. */
+ int memqcache_memcached_port; /* Memcached port number. Mondatory if memqcache_method=memcached. */
+ int memqcache_total_size; /* Total memory size in bytes for storing memory cache. Mandatory if memqcache_method=shmem. */
+ int memqcache_expire; /* Memory cache entry life time specified in seconds. 60 by default. */
+ int memqcache_maxcache; /* Maximum SELECT result size in bytes. */
+ int memqcache_cache_block_size; /* Cache block size in bytes. 8192 by default */
} POOL_CONFIG;
typedef enum {
Index: pool_config.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_config.c,v
retrieving revision 1.60
diff -c -r1.60 pool_config.c
*** pool_config.c 6 May 2011 23:43:26 -0000 1.60
--- pool_config.c 14 Jun 2011 13:11:22 -0000
***************
*** 526,531 ****
--- 526,533 ----
static char *default_reset_query_list[] = {"ABORT", "DISCARD ALL"};
static char *default_black_function_list[] = {"nextval", "setval"};
+ POOL_MEMCACHED_CONNECTION *memcached_con;
+
typedef enum {
POOL_KEY = 1,
POOL_INTEGER,
***************
*** 1936,1941 ****
--- 1938,1952 ----
pool_config->pattc = 0;
pool_config->current_pattern_size = 0;
+ pool_config->memory_cache_enabled = "off";
+ pool_config->memqcache_method = "shmem";
+ pool_config->memqcache_memcached_host = "";
+ pool_config->memqcache_memcached_port = "";
+ pool_config->memqcache_total_size = "";
+ pool_config->memqcache_expire = "60";
+ pool_config->memqcache_maxcache = "";
+ pool_config->memqcache_cache_block_size = "8192";
+
res = gethostname(localhostname,sizeof(localhostname));
if(res !=0 )
{
***************
*** 3441,3446 ****
--- 3452,3566 ----
}
pool_config->relcache_expire = v;
}
+ else if (!strcmp(key, "memory_cache_enabled") &&
+ CHECK_CONTEXT(INIT_CONFIG|RELOAD_CONFIG, context))
+ {
+ int v = eval_logical(yytext);
+
+ if (v < 0)
+ {
+ pool_error("pool_config: invalid value %s for %s", yytext, key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memory_cache_enabled = v;
+ }
+ else if (!strcmp(key, "memqcache_method") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ char *str;
+
+ if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ {
+ PARSE_ERROR();
+ fclose(fd);
+ return(-1);
+ }
+ str = extract_string(yytext, token);
+ if (str == NULL)
+ {
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_method = str;
+ }
+ else if (!strcmp(key, "memqcache_memcached_host") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ char *str;
+
+ if (token != POOL_STRING && token != POOL_UNQUOTED_STRING && token != POOL_KEY)
+ {
+ PARSE_ERROR();
+ fclose(fd);
+ return(-1);
+ }
+ str = extract_string(yytext, token);
+ if (str == NULL)
+ {
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_memcached_host = str;
+ }
+ else if (!strcmp(key, "memqcache_memcached_port") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_memcached_port = v;
+ }
+ else if (!strcmp(key, "memqcache_total_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_total_size = v;
+ }
+ else if (!strcmp(key, "memqcache_expire") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_expire = v;
+ }
+ else if (!strcmp(key, "memqcache_maxcache") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_maxcache = v;
+ }
+ else if (!strcmp(key, "memqcache_cache_block_size") && CHECK_CONTEXT(INIT_CONFIG, context))
+ {
+ int v = atoi(yytext);
+
+ if (token != POOL_INTEGER || v < 0)
+ {
+ pool_error("pool_config: %s must be equal or higher than 0 numeric value", key);
+ fclose(fd);
+ return(-1);
+ }
+ pool_config->memqcache_cache_block_size = v;
+ }
}
Index: pool_system.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_system.c,v
retrieving revision 1.7
diff -c -r1.7 pool_system.c
*** pool_system.c 1 Jun 2010 09:03:00 -0000 1.7
--- pool_system.c 14 Jun 2011 13:11:22 -0000
***************
*** 179,184 ****
--- 179,208 ----
}
/*
+ * memcached_connect:
+ * Connects memcached.
+ */
+ int memcached_connect (void)
+ {
+ memcached_con->memc = memcached_create(NULL);
+ memcached_con->servers = memcached_server_list_append(NULL,
+ pool_config->memqcache_memcached_host,
+ pool_config->memqcache_memcached_port,
+ &memcached_con->rc);
+
+ memcached_con->rc = memcached_server_push(memcached_con->memc, memcached_con->servers);
+ if (memcached_con->rc != MEMCACHED_SUCCESS)
+ {
+ pool_error("memcached_connect: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
+ return 1;
+ }
+
+ memcached_server_list_free(memcached_con->servers);
+
+ return 0;
+ }
+
+ /*
* pool_memset_system_db_info:
* Initializes distribution rules. Distribution rules are stored in
* System DB. So we have to execute query, and expand results on
Index: pool_proto_modules.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v
retrieving revision 1.98
diff -c -r1.98 pool_proto_modules.c
*** pool_proto_modules.c 5 Jun 2011 23:03:06 -0000 1.98
--- pool_proto_modules.c 14 Jun 2011 13:11:22 -0000
***************
*** 191,196 ****
--- 191,209 ----
}
}
+ if (pool_config->memory_cache_enabled
+ && IsA(node, SelectStmt)
+ && !(is_select_pgcatalog = IsSelectpgcatalog(node, backend)))
+ {
+ /* Look up cache data on memory */
+ if (execute_memqcache_lookup(frontend, backend, node) == POOL_CONTINUE)
+ {
+ pool_query_context_destroy(query_context);
+ pool_set_skip_reading_from_backends();
+ return POOL_CONTINUE;
+ }
+ }
+
/*
* Start query context
*/
***************
*** 2117,2123 ****
--- 2130,2161 ----
break;
case 'Z': /* ReadyForQuery */
+ /* if no Memcached connected, retry to connect it */
+ if (memcached_con->rc != MEMCACHED_SUCCESS)
+ {
+ memcached_connect();
+ if (memcached_con->rc != MEMCACHED_SUCCESS)
+ {
+ pool_error("Could not connect the Memcached");
+ }
+ else
+ {
+ /* convert SELECT query to md5 hash */
+ pool_md5_hash(frontend->buf2, strlen(frontend->buf2), md5_query);
+ /* set cache data. key md5 hash SELECT, value RowDescription, DataRow.. */
+ set_cache_on_memcached(frontend, md5_query, get_buf());
+ }
+ }
+ else
+ {
+ /* convert SELECT query to md5 hash */
+ pool_md5_hash(frontend->buf2, strlen(frontend->buf2), md5_query);
+ /* set cache data. key md5 hash SELECT, value RowDescription, DataRow.. */
+ set_cache_on_memcached(frontend, md5_query, get_buf());
+ }
status = ReadyForQuery(frontend, backend, 1);
+ /* initialize buf for caching on memory */
+ init_buf();
break;
case '1': /* ParseComplete */
Index: pool_process_query.c
===================================================================
RCS file: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v
retrieving revision 1.259
diff -c -r1.259 pool_process_query.c
*** pool_process_query.c 13 May 2011 06:53:05 -0000 1.259
--- pool_process_query.c 14 Jun 2011 13:11:23 -0000
***************
*** 107,112 ****
--- 107,115 ----
qcnt = 0;
state = 0;
+ /* initialize buf for caching on memory */
+ init_buf();
+
for (;;)
{
/* Are we requested to send reset queries? */
***************
*** 1163,1168 ****
--- 1166,1177 ----
query_cache_register(kind, frontend, backend->info->database, p1, len1);
}
+ /* save the received result on memory for each kind */
+ if (pool_config->memory_cache_enabled)
+ {
+ memqcache_register(kind, frontend, backend->info->database, p1, len1);
+ }
+
/* error response? */
if (kind == 'E')
{
/* -*-pgsql-c-*- */
/*
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
* Copyright (c) 2003-2010 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
* granted, provided that the above copyright notice appear in all
* copies and that both that copyright notice and this permission
* notice appear in supporting documentation, and that the name of the
* author not be used in advertising or publicity pertaining to
* distribution of the software without specific, written prior
* permission. The author makes no representations about the
* suitability of this software for any purpose. It is provided "as
* is" without express or implied warranty.
*
* pool_memqcache.c: query cache on shmem or memcached
*
*/
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "pool.h"
#include "md5.h"
#include "pool_stream.h"
#include "pool_config.h"
#include "pool_proto_modules.h"
#include "parser/parsenodes.h"
typedef enum
{
CACHE_FOUND, CACHE_NOT_FOUND, CACHE_ERROR
} CACHE_STATUS;
static size_t buflen = 0;
static size_t bufsize = 0;
static char *buf = NULL;
/* --------------------------------
* memqcache_lookup() - lookup cache data on memory
* returns POOL_CONTINUE, POOL_END, POOL_ERROR
`* --------------------------------
*/
POOL_STATUS memqcache_lookup(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
Node *node)
{
SelectStmt *select = (SelectStmt *)node;
POOL_STATUS status = POOL_END; /* cache not found */
if (! (select->intoClause || select->lockingClause))
{
parsed_query = strdup(nodeToString(node));
if (parsed_query == NULL)
{
pool_error("execute_memqcache_lookup: malloc failed");
return POOL_ERROR;
}
/* if no Memcached connected, retry to connect it */
if (memcached_con->rc != MEMCACHED_SUCCESS)
{
memcached_connect();
if (memcached_con->rc != MEMCACHED_SUCCESS)
{
pool_error("Could not connect the Memcached");
}
else
{
/* convert SELECT query to md5 hash */
pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
/* look up SELECT query string */
status = get_cache_on_memcached(frontend, md5_query, buf, len);
}
}
else
{
/* convert SELECT query to md5 hash */
pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
/* look up SELECT query string */
status = get_cache_on_memcached(frontend, md5_query, buf, len);
}
/* cache found */
if (status == CACHE_FOUND)
{
/* free */
free(parsed_query);
parsed_query = NULL;
free_parser();
status = POOL_CONTINUE;
}
/* cache not found */
else if (status == CACHE_ERROR)
{
pool_error("pool_memqcache_lookup: memqcache lookup failed");
status = POOL_ERROR;
}
else
{
pool_debug("pool_memqcache_lookup: memqcache not found");
status = POOL_END;
}
}
return status;
}
/* --------------------------------
* set_cache_on_memcached() - set cache data on memcached
* returns 1 on success, 0 otherwise
`* --------------------------------
*/
int set_cache_on_memcached(POOL_CONNECTION *frontend,
const char *md5_query,
const char *data)
{
/* memory_cache_enabled "on", and method memcached */
if (pool_config->memory_cache_enabled && pool_config->memqcache_method == "memcached")
{
return 0;
}
/* if no SELECT query string */
if (strlen(query) <= 0)
{
return 0;
}
pool_debug("set_cache_on_memcached: md5_query=%s ", md5_query);
/* set cache data to memcached. key is md5 hash query */
memcached_con->rc = memcached_set(memcached_con->memc, md5_query, strlen(md5_query), data, strlen(data), pool_config->memqcache_expire, 0);
/* setting cache data on memcached is failed */
if (memcached_con->rc != MEMCACHED_SUCCESS && memcached_con->rc != MEMCACHED_BUFFERED)
{
pool_error("set_cache_on_memcached: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
return 0;
}
pool_debug("set_cache_on_memcached: succeeded.");
return 1;
}
/* --------------------------------
* get_cache_on_memcached() - get cache data on memcached
* returns CACHE_FOUND on success, CACHE_ERROR otherwise
`* --------------------------------
*/
int get_cache_on_memcached(POOL_CONNECTION *frontend,
const char *md5_query,
char **buf,
size_t *len)
{
uint32_t flags;
char *ptr;
/* memory_cache_enabled "on", and method memcached */
if (pool_config->memory_cache_enabled && pool_config->memqcache_method == "memcached")
{
return CACHE_ERROR;
}
/* if no SELECT query string */
if (strlen(query) <= 0)
{
return CACHE_ERROR;
}
/* get cache data on memcached. key is md5 hash query */
ptr = memcached_get(memcached_con->memc, md5_query, strlen(md5_query), len, &flags, &memcached_con->rc);
/* getting cache data on memcached is failed */
if (memcached_con->rc != MEMCACHED_SUCCESS)
{
pool_error("get_cache_on_memcached: %s", memcached_strerror(memcached_con->memc, memcached_con->rc));
return CACHE_ERROR;
}
/* copy buf */
memcpy(buf, ptr, *len);
/* free */
free(ptr);
pool_debug("get_cache_on_memcached: md5_query=%s", md5_query);
return CACHE_FOUND;
}
/* --------------------------------
* delete_cache_on_memcached() - delete cache data on memcached
* returns 1 on success, 0 otherwise
`* --------------------------------
*/
int delete_cache_on_memcached(const char *key)
{
/* delete cache data on memcached. key is md5 hash query */
memcached_con->rc= memcached_delete(memcached_con->memc, key, strlen(key), (time_t)0);
/* delete cache data on memcached is failed */
if (memcached_con->rc != MEMCACHED_SUCCESS && memcached_con->rc == MEMCACHED_BUFFERED) {
pool_error ("delete_cache_on_memcached: %s\n", memcached_strerror(memcached_con->memc, memcached_con->rc));
/* free */
memcached_free(memcached_con->memc);
return 0;
}
/* free */
memcached_free(memcached_con->memc);
return 1;
}
/* --------------------------------
* memqcache_register() - delete cache data on memcached
`* --------------------------------
*/
void memqcache_register(char kind,
POOL_CONNECTION *frontend,
char *database,
char *data,
int data_len)
{
int send_len;
int result = 0;
if (is_select_pgcatalog || is_select_for_update)
{
return;
}
if (kind == 'T' && parsed_query)
{
pool_debug("execute_memqcache_register: saving cache for query: \"%s\"", query);
/* store data into the cache */
add_buf(&kind, 1);
send_len = htonl(data_len + sizeof(int));
add_buf(&send_len, sizeof(int));
add_buf(data, data_len);
/* if no Memcached connected, retry to connect it */
if (memcached_con->rc != MEMCACHED_SUCCESS)
{
memcached_connect();
if (memcached_con->rc != MEMCACHED_SUCCESS)
{
pool_error("Could not connect the Memcached");
}
else
{
/* convert SELECT query to md5 hash */
pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
/* set tableOID from RowDescription and md5_query. key tableOID, value md5 hash */
result = set_cache_on_memcached(frontend, buf, md5_query);
}
}
else
{
/* convert SELECT query to md5 hash */
pool_md5_hash(parsed_query, strlen(parsed_query), md5_query);
/* set tableOID from RowDescription and md5_query. key tableOID, value md5 hash */
result = set_cache_on_memcached(frontend, buf, md5_query);
}
if (result < 0)
{
pool_error("memqcache_register: query cache registration failed");
return;
}
}
else if (kind == 'D' || kind == 'C')
{
/* store data into the cache */
add_buf(&kind, 1);
send_len = htonl(data_len + sizeof(int));
add_buf(&send_len, sizeof(int));
add_buf(data, data_len);
pool_debug("memqcache_register: query cache saved");
/* free */
free(parsed_query);
parsed_query = NULL;
}
return;
}
/* --------------------------------
* add_buf() - add the data from Backend such as RowDescription, DataRow.. to buf
* returns buf length on success, 0 otherwise
`* --------------------------------
*/
int add_buf(const char *s, size_t len)
{
int pos=0;
/* no memory_cache_enabled "on" */
if (pool_config->memory_cache_enabled != "on")
{
return 0;
}
if (buflen + len >= bufsize) {
size_t newbufsize = (buflen + len) + pool_config->memqcache_cache_block_size;
pool_debug("add_buf: realloc buf bufsize=%d newbufsize=%d", bufsize, newbufsize);
bufsize = newbufsize;
buf = (char *)realloc(buf, bufsize);
}
/* add the data from Backend to buf one by one */
for (pos=0 ; pos<len ; pos++)
{
buf[buflen+pos] = s[pos];
}
buflen += pos;
pool_debug("add_buf: len=%d, total=%d bufsize=%d", len, buflen, bufsize);
return buflen;
}
/* --------------------------------
* get_buf() - get buf for caching on memory
* returns buf
`* --------------------------------
*/
char* get_buf(void)
{
return buf;
}
/* --------------------------------
* init_buf() - initialize buf for caching on memory
`* --------------------------------
*/
void init_buf(void)
{
buflen = 0;
if (buf == NULL)
{
bufsize = pool_config->memqcache_cache_block_size;
buf = (char *)malloc(bufsize);
}
memset(buf, 0, bufsize);
}
_______________________________________________
Pgpool-hackers mailing list
[email protected]
http://pgfoundry.org/mailman/listinfo/pgpool-hackers