Hiya,

I've written some code to push logfile writing into an external process,
freeing up the main squid process from the potentially blocking
stdio writes.

This code isn't ready to be merged into squid-2.6. Its meant more as a
prototype. I haven't even reviewed it for 'correctness' besides running
it under a reasonable load (200req/sec) for half a day.

the todo list:

* finish making stuff configurable
* rework the socket IO scheduling code to try and hold onto a buffer until
  its full or 1 second has passed. It might not really matter but it'll be
  interesting to profile it at very high (>1500/sec) request rates.
* figure out what to do about rotate (as it stands, this code only re-opens
  the logfiles and therefore pushes logfile maintainence into an external
  script.)

It has the cute side-effect of allowing people to finally write plugins to
easily throw squid logs into an external program - eg mysql logging, or
something to log over TCP to a central logging server, etc.



Adrian

Index: src/Makefile.am
===================================================================
RCS file: /server/cvs-server/squid/squid/src/Makefile.am,v
retrieving revision 1.51
diff -u -r1.51 Makefile.am
--- src/Makefile.am     11 Jun 2006 17:06:25 -0000      1.51
+++ src/Makefile.am     16 Jul 2006 13:41:40 -0000
@@ -44,6 +44,8 @@
 UNLINKD = 
 endif
 
+LOGFILE_DAEMON = logfile_daemon
+
 if ENABLE_PINGER
 PINGER = pinger
 else
@@ -84,6 +86,7 @@
 
 EXTRA_PROGRAMS = \
        unlinkd \
+       logfile_daemon \
        pinger \
        dnsserver
 
@@ -96,6 +99,7 @@
 libexec_PROGRAMS = \
        $(PINGER) \
        $(DNSSERVER) \
+       $(LOGFILE_DAEMON) \
        $(UNLINKD)
 
 cf_gen_SOURCES = cf_gen.c defines.h debug.c
@@ -257,6 +261,11 @@
 unlinkd-daemon.o: unlinkd.c
        $(COMPILE) -DUNLINK_DAEMON -c $(srcdir)/unlinkd.c -o $@
 
+logfile_daemon_SOURCES = logfile_daemon.c
+logfile_daemon.o: logfile_daemon.c
+       $(COMPILE) -c $(srcdir)/logfile_daemon.c -o $@
+
+
 pinger_SOURCES = \
        pinger.c \
        debug.c
@@ -307,6 +316,7 @@
 DEFAULT_SWAP_DIR        = $(localstatedir)/cache
 DEFAULT_PINGER         = $(libexecdir)/`echo pinger | sed 
'$(transform);s/$$/$(EXEEXT)/'`
 DEFAULT_UNLINKD                = $(libexecdir)/`echo unlinkd | sed 
'$(transform);s/$$/$(EXEEXT)/'`
+DEFAULT_LOGFILE_DAEMON = $(libexecdir)/`echo logfile_daemon | sed 
'$(transform);s/$$/$(EXEEXT)/'`
 DEFAULT_DISKD          = $(libexecdir)/`echo diskd_daemon | sed 
'$(transform);s/$$/$(EXEEXT)/'`
 DEFAULT_ICON_DIR       = $(datadir)/icons
 DEFAULT_ERROR_DIR      = $(datadir)/errors/@ERR_DEFAULT_LANGUAGE@
@@ -353,6 +363,7 @@
        [EMAIL PROTECTED]@%$(DEFAULT_MIME_TABLE)%g;\
        [EMAIL PROTECTED]@%$(DEFAULT_DNSSERVER)%g;\
        [EMAIL PROTECTED]@%$(DEFAULT_UNLINKD)%g;\
+       [EMAIL PROTECTED]@%$(DEFAULT_LOGFILE_DAEMON)%g;\
        [EMAIL PROTECTED]@%$(DEFAULT_PINGER)%g;\
        [EMAIL PROTECTED]@%$(DEFAULT_DISKD)%g;\
        [EMAIL PROTECTED]@%$(DEFAULT_CACHE_LOG)%g;\
Index: src/logfile.c
===================================================================
RCS file: /server/cvs-server/squid/squid/src/logfile.c,v
retrieving revision 1.16
diff -u -r1.16 logfile.c
--- src/logfile.c       5 Jun 2006 21:06:34 -0000       1.16
+++ src/logfile.c       16 Jul 2006 13:41:41 -0000
@@ -34,7 +34,7 @@
 
 #include "squid.h"
 
-static void logfileWriteWrapper(Logfile * lf, const void *buf, size_t len);
+#define        LOGFILE_BUFSZ   32768
 
 #if HAVE_SYSLOG
 
@@ -48,6 +48,7 @@
     int value;
 } syslog_symbol_t;
 
+
 static int
 syslog_ntoa(const char *s)
 {
@@ -115,15 +116,156 @@
 #define PRIORITY_MASK (LOG_ERR | LOG_WARNING | LOG_NOTICE | LOG_INFO | 
LOG_DEBUG)
 #endif /* HAVE_SYSLOG */
 
+/* Internal code */
+static void
+logfileNewBuffer(Logfile *lf)
+{
+       logfile_buffer_t *b;
+
+       debug(50, 3) ("logfileNewBuffer: %s: new buffer\n", lf->path);
+
+       b = xcalloc(1, sizeof(logfile_buffer_t));
+       assert(b != NULL);
+       b->buf = xcalloc(1, LOGFILE_BUFSZ);
+       assert(b->buf != NULL);
+       b->size = LOGFILE_BUFSZ;
+       b->written_len = 0;
+       b->len = 0;
+       b->full = 0;
+       dlinkAddTail(b, &b->node, &lf->bufs);
+       lf->nbufs++;
+}
+
+static void
+logfileFreeBuffer(Logfile *lf, logfile_buffer_t *b)
+{
+       assert(b != NULL);
+       dlinkDelete(&b->node, &lf->bufs);
+       lf->nbufs --;
+       xfree(b->buf);
+       xfree(b);
+}
+
+static void
+logfileHandleWrite(int fd, void *data)
+{
+       Logfile *lf = (Logfile *) data;
+       int ret;
+       logfile_buffer_t *b;
+
+       /*
+        * We'll try writing the first entry until its done - if we
+        * get a partial write then we'll re-schedule until its completed.
+        * Its naive but it'll do for now.
+        */
+       b = lf->bufs.head->data;
+       assert (b != NULL);
+       lf->flush_pending = 0;
+
+       ret = FD_WRITE_METHOD(lf->wfd, b->buf + b->written_len, b->len - 
b->written_len);
+       debug(50, 3) ("logfileHandleWrite: %s: write returned %d\n", lf->path, 
ret);
+       if (ret < 0) {
+               if (ignoreErrno(errno)) {
+                       /* something temporary */
+                       goto reschedule;
+               }
+               debug(50, 1) ("logfileHandleWrite: %s: error writing (%s)\n", 
lf->path, xstrerror());
+               /* XXX should handle this better */
+               fatal("I don't handle this error well!");
+       }
+       if (ret == 0) {
+               /* error? */
+               debug(50, 1) ("logfileHandleWrite: %s: wrote 0 bytes?\n", 
lf->path);
+               /* XXX should handle this better */
+               fatal("I don't handle this error well!");
+       }
+
+       /* ret > 0, so something was written */
+       b->written_len += ret;
+       assert(b->written_len <= b->len);
+       if (b->written_len == b->len) {
+               /* written the whole buffer! */
+               logfileFreeBuffer(lf, b);
+               b = NULL;
+       }
+       /* Is there more to write? */
+       if (lf->bufs.head == NULL) {
+               goto finish;
+       }
+       /* there is, so schedule more */
+
+reschedule:
+       commSetSelect(lf->wfd, COMM_SELECT_WRITE, logfileHandleWrite, lf, 0);
+       lf->flush_pending = 1;
+finish:
+       return;
+}
+
+static void
+logfileQueueWrite(void *data)
+{
+       Logfile *lf = (Logfile *) data;
+       eventAdd("logfile_flush", logfileQueueWrite, lf, 1.0, 1);
+       if (lf->flush_pending || lf->bufs.head == NULL) {
+               return;
+       }
+       /* Ok, schedule a write-event */
+       commSetSelect(lf->wfd, COMM_SELECT_WRITE, logfileHandleWrite, lf, 0);
+       lf->flush_pending = 1;
+       debug(5, 2) ("logfileQueueWrite: %s: queue depth = %d\n", lf->path, 
lf->nbufs);
+}
+
+
+static void
+logfileAppend(Logfile *lf, char *buf, int len)
+{
+       logfile_buffer_t *b;
+       int s;
+
+       /* Is there a buffer? If not, create one */
+       if (lf->bufs.head == NULL) {
+               logfileNewBuffer(lf);
+       }
+       debug(50, 3) ("logfileAppend: %s: appending %d bytes\n", lf->path, len);
+       /* Copy what can be copied */
+       while (len > 0) {
+               b = lf->bufs.tail->data;
+               debug(50, 3) ("logfileAppend: current buffer has %d of %d bytes 
before append\n",  b->len, b->size);
+               s = XMIN(len, (b->size - b->len));
+               xmemcpy(b->buf + b->len, buf, s);
+               len = len - s;
+               buf = buf + s;
+               b->len = b->len + s;
+               assert(b->len <= LOGFILE_BUFSZ);
+               assert(len >= 0);
+               if (len > 0) {
+                       logfileNewBuffer(lf);
+               }
+       }
+#if 0
+       if (lf->bufs.head != NULL) {
+               logfileQueueWrite(lf);
+       }
+#endif
+}
+
+
+/* External code */
+
+CBDATA_TYPE(Logfile);
 Logfile *
 logfileOpen(const char *path, size_t bufsz, int fatal_flag)
 {
-    Logfile *lf = xcalloc(1, sizeof(*lf));
+    Logfile *lf;
+    const char * args[3];
+    CBDATA_INIT_TYPE(Logfile);
+    lf = cbdataAlloc(Logfile);
     xstrncpy(lf->path, path, MAXPATHLEN);
 #if HAVE_SYSLOG
     if (strcmp(path, "syslog") == 0 || strncmp(path, "syslog:", 7) == 0) {
        lf->flags.syslog = 1;
-       lf->fd = -1;
+       lf->rfd = -1;
+       lf->wfd = -1;
        if (path[6] != '\0') {
            const char *priority = path + 7;
            char *facility = (char *) strchr(priority, '|');
@@ -138,29 +280,16 @@
     } else
 #endif
     {
-       int fd = file_open(path, O_WRONLY | O_CREAT | O_TEXT);
-       if (DISK_ERROR == fd) {
-           if (ENOENT == errno && fatal_flag) {
-               fatalf("Cannot open '%s' because\n"
-                   "\tthe parent directory does not exist.\n"
-                   "\tPlease create the directory.\n", path);
-           } else if (EACCES == errno && fatal_flag) {
-               fatalf("Cannot open '%s' for writing.\n"
-                   "\tThe parent directory must be writeable by the\n"
-                   "\tuser '%s', which is the cache_effective_user\n"
-                   "\tset in squid.conf.", path, Config.effectiveUser);
-           } else {
-               debug(50, 1) ("logfileOpen: %s: %s\n", path, xstrerror());
-               safe_free(lf);
-               return NULL;
-           }
-       }
-       lf->fd = fd;
-       if (bufsz > 0) {
-           lf->buf = xmalloc(bufsz);
-           lf->bufsz = bufsz;
-       }
+        args[0] = "logfile_daemon";
+        args[1] = path;
+        args[2] = NULL;
+        lf->lpid = ipcCreate(IPC_STREAM, 
"/home/adrian/work/squid/squid-2.6/src/logfile_daemon", args, "logfile_daemon", 
&lf->rfd, &lf->wfd);
+        if (lf->lpid < 0)
+           fatal("Couldn't start logfile helper");
     }
+    lf->nbufs = 0;
+    eventAdd("logfile_flush", logfileQueueWrite, lf, 1.0, 1);
+
     if (fatal_flag)
        lf->flags.fatal = 1;
     return lf;
@@ -170,50 +299,20 @@
 logfileClose(Logfile * lf)
 {
     logfileFlush(lf);
-    if (lf->fd >= 0)
-       file_close(lf->fd);
-    if (lf->buf)
-       xfree(lf->buf);
-    xfree(lf);
+    fd_close(lf->rfd);
+    fd_close(lf->wfd); /* Should kill the logfile process! */
+    eventDelete(logfileQueueWrite, lf);
+    cbdataFree(lf);
 }
 
 void
 logfileRotate(Logfile * lf)
 {
-#ifdef S_ISREG
-    struct stat sb;
-#endif
-    int i;
-    char from[MAXPATHLEN];
-    char to[MAXPATHLEN];
-    assert(lf->path);
-    if (lf->flags.syslog)
-       return;
-#ifdef S_ISREG
-    if (stat(lf->path, &sb) == 0)
-       if (S_ISREG(sb.st_mode) == 0)
-           return;
-#endif
     debug(0, 1) ("logfileRotate: %s\n", lf->path);
-    /* Rotate numbers 0 through N up one */
-    for (i = Config.Log.rotateNumber; i > 1;) {
-       i--;
-       snprintf(from, MAXPATHLEN, "%s.%d", lf->path, i - 1);
-       snprintf(to, MAXPATHLEN, "%s.%d", lf->path, i);
-       xrename(from, to);
-    }
-    /* Rotate the current log to .0 */
-    logfileFlush(lf);
-    file_close(lf->fd);                /* always close */
-    if (Config.Log.rotateNumber > 0) {
-       snprintf(to, MAXPATHLEN, "%s.%d", lf->path, 0);
-       xrename(lf->path, to);
-    }
-    /* Reopen the log.  It may have been renamed "manually" */
-    lf->fd = file_open(lf->path, O_WRONLY | O_CREAT | O_TEXT);
-    if (DISK_ERROR == lf->fd && lf->flags.fatal) {
-       debug(50, 1) ("logfileRotate: %s: %s\n", lf->path, xstrerror());
-       fatalf("Cannot open %s: %s", lf->path, xstrerror());
+    if (lf->lpid > 0) {
+       if (kill(lf->lpid, SIGHUP) < 0) {
+           debug(0, 1) ("logfileRotate: %s: couldn't contact logfile_daemon 
process: %s\n", lf->path, xstrerror());
+       }
     }
 }
 
@@ -226,22 +325,8 @@
        return;
     }
 #endif
-    if (0 == lf->bufsz) {
-       /* buffering disabled */
-       logfileWriteWrapper(lf, buf, len);
-       return;
-    }
-    if (lf->offset > 0 && lf->offset + len > lf->bufsz)
-       logfileFlush(lf);
-    if (len > lf->bufsz) {
-       /* too big to fit in buffer */
-       logfileWriteWrapper(lf, buf, len);
-       return;
-    }
-    /* buffer it */
-    xmemcpy(lf->buf + lf->offset, buf, len);
-    lf->offset += len;
-    assert(lf->offset <= lf->bufsz);
+    /* Append this data to the end buffer; create a new one if needed */
+    logfileAppend(lf, buf, len);
 }
 
 void
@@ -277,25 +362,5 @@
 void
 logfileFlush(Logfile * lf)
 {
-    if (0 == lf->offset)
-       return;
-    logfileWriteWrapper(lf, lf->buf, (size_t) lf->offset);
-    lf->offset = 0;
-}
-
-/*
- * Aborts with fatal message if write() returns something other
- * than its length argument.
- */
-static void
-logfileWriteWrapper(Logfile * lf, const void *buf, size_t len)
-{
-    int s;
-    s = FD_WRITE_METHOD(lf->fd, buf, len);
-    fd_bytes(lf->fd, s, FD_WRITE);
-    if (s == len)
-       return;
-    if (!lf->flags.fatal)
-       return;
-    fatalf("logfileWrite: %s: %s\n", lf->path, xstrerror());
+    /* XXX unimplemented for now */
 }
Index: src/structs.h
===================================================================
RCS file: /server/cvs-server/squid/squid/src/structs.h,v
retrieving revision 1.491
diff -u -r1.491 structs.h
--- src/structs.h       5 Jul 2006 06:52:12 -0000       1.491
+++ src/structs.h       16 Jul 2006 13:41:53 -0000
@@ -553,6 +553,7 @@
 #if USE_UNLINKD
        char *unlinkd;
 #endif
+       char *logfile_daemon;
        char *diskd;
 #if USE_SSL
        char *ssl_password;
@@ -2409,17 +2410,27 @@
     } shm;
 };
 
+struct _logfile_buffer {
+    char *buf;
+    int size;
+    int len;
+    int written_len;
+    int full;
+    dlink_node node;
+};
+
 struct _Logfile {
-    int fd;
+    int rfd, wfd;
     char path[MAXPATHLEN];
-    char *buf;
-    size_t bufsz;
-    ssize_t offset;
+    dlink_list bufs;
+    int nbufs;
     struct {
        unsigned int fatal;
        unsigned int syslog;
     } flags;
     int syslog_priority;
+    int flush_pending;
+    pid_t lpid;
 };
 
 struct _logformat {
Index: src/typedefs.h
===================================================================
RCS file: /server/cvs-server/squid/squid/src/typedefs.h,v
retrieving revision 1.148
diff -u -r1.148 typedefs.h
--- src/typedefs.h      5 Jul 2006 06:52:12 -0000       1.148
+++ src/typedefs.h      16 Jul 2006 13:41:54 -0000
@@ -213,6 +213,7 @@
 typedef struct _storefs_entry storefs_entry_t;
 typedef struct _storerepl_entry storerepl_entry_t;
 typedef struct _diskd_queue diskd_queue;
+typedef struct _logfile_buffer logfile_buffer_t;
 typedef struct _Logfile Logfile;
 typedef struct _logformat_token logformat_token;
 typedef struct _logformat logformat;
Index: src/cf.data.pre
===================================================================
RCS file: /server/cvs-server/squid/squid/src/cf.data.pre,v
retrieving revision 1.357
diff -u -r1.357 cf.data.pre
--- src/cf.data.pre     9 Jul 2006 14:39:31 -0000       1.357
+++ src/cf.data.pre     16 Jul 2006 13:42:18 -0000
@@ -1632,6 +1632,14 @@
        Specify the location of the executable for file deletion process.
 DOC_END
 
+NAME: logfile_daemon_program
+TYPE: string
+DEFAULT: @DEFAULT_LOGFILE_DAEMON@
+LOC: Config.Program.logfile_daemon
+DOC_START
+       Specify the location of the executable for writing to logfiles.
+DOC_END
+
 NAME: pinger_program
 TYPE: string
 DEFAULT: @DEFAULT_PINGER@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <signal.h>


#define	LOGFILE_BUF_LEN		65536

int do_rotate = 0;

void
signal_hup(int unused)
{
	do_rotate = 1;
}


int
main(int argc, char *argv[])
{
	int t;
	int fd, r;
	char buf[LOGFILE_BUF_LEN];

	if (argc < 2) {
		printf("Error: usage: %s <logfile>\n", argv[0]);
		exit(1);
	}

	fd = open(argv[1], O_WRONLY |  O_APPEND | O_CREAT, 0644);
	if (fd < 0) {
		perror("open");
		exit(1);
	}

	signal(SIGHUP, signal_hup);

	setbuf(stdin, NULL);
	setbuf(stdout, NULL);
	close(2);
	t = open("/dev/null", O_RDWR);
	assert(t > -1);
	dup2(t, 2);

	while( (r = read(0, buf, LOGFILE_BUF_LEN)) > 0) {
		if (write(fd, buf, r) != r) {
			perror("write");
			break;
		}
		fsync(fd);

		/* Check for rotate (which is really just 'reopen') */
		if (do_rotate == 1) {
			do_rotate = 0;
			close(fd); fd = -1;
			fd = open(argv[1], O_WRONLY |  O_APPEND | O_CREAT, 0644);
			if (fd < 0) {
				perror("open");
				exit(1);
			}
		}
	}
	close(fd); fd = -1;
	exit(0);
}

Reply via email to