Hi,

This continues on from my last patch for prepared statements.

The patch is for flow-exports to use bulk updates. I have been
talking with the helpful people on the mysql irc chan. Once
I had wrote this, what use to take 80min to load into a the
database now takes 20min. It has a huge and very noticeable
improvement.

Compare format3 with this format6 (careful of the differing
field types)

There is a #define you can tweak to try and get the most out of
this patch BULK_INSERT_COUNT

Too small and you spend your life as before iterating a loop,
too big and you may exceed some (net) io (fragmenting the data).

I have yet to determine the optimum value for my system, if you
have any ideas let me know.

Thorben

Note, this depends on the previous patches
--- a/src/flow-export.c	2005-10-11 08:56:28.000000000 +0100
+++ b/src/flow-export.c	2005-10-11 08:59:09.000000000 +0100
@@ -58,8 +58,12 @@
 #define DB_DEFAULT_DBUSER "netflow"
 #define DB_DEFAULT_DBPWD "netflow"
 
+/* The most fields in a flow */
 #define MAX_FIELD_COUNT 33
 
+/* Tune this to your needs for bast speed*/
+#define BULK_INSERT_COUNT 100
+
 int fmt_xfields_bind(u_int64 xfield, MYSQL_BIND *bind, struct fts3rec_all *cur);
 
 #endif /* MYSQL */
@@ -92,6 +96,7 @@
 struct options {
   char dbaseURI[256];
    char dbtag[128];
+  int dblock;
   u_int32 cflowd_mask;
   u_int64 ft_mask;
   u_long records;
@@ -142,11 +147,12 @@
 
   opt.cflowd_mask = 0xFFFFFFFFL;
   opt.ft_mask = 0xFFFFFFFFFFFFFFFFLL;
+  opt.dblock = 0;
 
   /* profile */
   ftprof_start (&ftp);
 
-  while ((i = getopt(argc, argv, "h?d:f:m:t:u:")) != -1)
+  while ((i = getopt(argc, argv, "h?d:f:lm:t:u:")) != -1)
 
     switch (i) {
 
@@ -164,6 +170,10 @@
       exit (0);
       break;
 
+    case 'l':
+      opt.dblock = 1;
+      break;
+
     case 'm': /* cflowd mask */
       if (isalpha((int)optarg[0])) {
         ascii_mask = 1;
@@ -983,19 +993,19 @@
 #ifdef HAVE_MYSQL
 	struct fts3rec_offsets fo;
 	struct ftver ftv;
-	struct fts3rec_all cur;
-	char fields[1024], query[3*1024];
+	struct fts3rec_all flowrec[BULK_INSERT_COUNT];
+	char fields[1024], query[BULK_INSERT_COUNT*MAX_FIELD_COUNT*2+1024], query_v[128];
 	char *rec;
 	char *db_host, *db_name, *db_table, *db_user, *db_pwd, *db_tmp, *tmp;
 	int db_port;
-	int len,count,i;
+	int len,count,i,j;
 
 	MYSQL *mysql;
   
 	MYSQL_STMT    *stmt;
-	MYSQL_BIND    bind[MAX_FIELD_COUNT+1];
+	MYSQL_BIND    bind[BULK_INSERT_COUNT*(MAX_FIELD_COUNT+1)];
 	char *tag_field, *tag_value;
-	int tag_length;
+	unsigned long tag_length;
 	
 	db_host = DB_DEFAULT_DBHOST;
 	db_name = DB_DEFAULT_DBNAME;
@@ -1018,13 +1028,11 @@
 		db_table = strsep(&tmp, ":");
 		db_port = atoi(db_tmp);
 
-		if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table) {
-			fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
-			return -1;
-		}
+		if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table)
+			fterr_errx(1,"Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
 
 	} /* dbaseURI */
-  
+
 	ftio_get_ver(ftio, &ftv);
 
 	fts3rec_compute_offsets(&fo, &ftv);
@@ -1034,37 +1042,44 @@
 
 	/* generate the field names once */
 	len = fmt_xfields_type(fields, opt->ft_mask);
-	
-	memset(bind, 0, sizeof(bind));
-	count = fmt_xfields_bind(opt->ft_mask, bind, &cur);
 
 	/* parse tag string*/
+	tag_field = NULL;
 	if (strlen(opt->dbtag)) {
 		tmp = opt->dbtag;
 		
 		tag_field = strsep(&tmp, "=");
 		tag_value = tmp;
 		
-		if (!tag_field || !tag_value) {
-			fterr_warnx("DB Tag format error, expecting Field=Value.");
-			return -1;
-		}
+		if (!tag_field || !tag_value)
+			fterr_errx(1,"DB Tag format error, expecting Field=Value.");
 		
-		if ((len + strlen(tag_field)+1) >= sizeof(fields)) {
-			fterr_warnx("Buffer too small for tag field.");
-			return -1;
-		}
+		if ((len + strlen(tag_field)+1) >= sizeof(fields))
+			fterr_errx(1,"Buffer too small for tag field.");
 		
 		strcat(fields, ",");
 		strcat(fields, tag_field);
 		
 		tag_length = strlen(tag_value);
-		bind[count].buffer_type= MYSQL_TYPE_STRING;
-		bind[count].buffer = tag_value;
-		bind[count].buffer_length = tag_length;
-		bind[count].is_null= 0;
-		bind[count].length = &tag_length;
-		count++;
+	}
+	
+	/*bind flowrec fields to mysql */
+	i = 0;
+	memset(bind, 0, sizeof(bind));
+	for (j=0; j<BULK_INSERT_COUNT; j++) {
+		
+		i += fmt_xfields_bind(opt->ft_mask, &bind[i], &flowrec[j]);
+
+		if (tag_field) {
+			bind[i].buffer_type = MYSQL_TYPE_STRING;
+			bind[i].buffer = tag_value;
+			bind[i].buffer_length = tag_length;
+			bind[i].is_null= 0;
+			bind[i].length = &tag_length;
+			i++;
+		}
+
+		if (0==j)  count = i;
 	}
 	
 	
@@ -1073,48 +1088,89 @@
 	if (!(mysql = mysql_init(mysql)))
 		fterr_errx(1, "mysql_init(): failed");
 
-	if (mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "simple"))
-		fterr_errx(1, "mysql_options(): %s", mysql_error(mysql));
-
-	if (mysql_real_connect(mysql, db_host, db_user, db_pwd, 
-	db_name, db_port, NULL, 0) == NULL) 
+	if (mysql_real_connect(mysql, db_host, db_user, db_pwd, db_name, db_port, NULL, 0) == NULL) 
 		fterr_errx(1,"mysql_real_connect(): %s\n", mysql_error(mysql));
 
 	if (!(stmt = mysql_stmt_init(mysql)))
-		fterr_errx(1, "mysql_stmt_init(): failed");
+		fterr_errx(1, "mysql_stmt_init(): %s\n", mysql_error(mysql));
 	
-	query[0] = 0;
-	strcat(query, "INSERT INTO ");
-	strcat(query, db_table);
-	strcat (query, "(");
-	strcat (query, fields);
-	strcat (query, ") VALUES (");
+
+	/*One set of inserts*/
+	query_v[0] = 0;
+	strcat(query_v, "(");
 	for (i=0; i<count; i++) {
-		strcat(query, "?");
-		if (i+1<count) strcat(query, ", ");
+		strcat(query_v, "?");
+		if (i+1<count) strcat(query_v, ",");
 	}
-	strcat (query, ")");
-	
+	strcat(query_v, ")");
+
+	/* x BULK_INSERT_COUNT insert query */
+	snprintf(query, sizeof(query), "INSERT INTO %s (%s) VALUES ", db_table, fields);
+	for (i=0; i<BULK_INSERT_COUNT; i++) {
+		strncat(query, query_v, sizeof(query) - strlen(query));
+		if (i+1<BULK_INSERT_COUNT) strncat(query, ",", sizeof(query) - strlen(query));
+	}
+
 	if (mysql_stmt_prepare(stmt, query, strlen(query)))
 		fterr_errx(1,"mysql_stmt_prepare(): %s\n", mysql_stmt_error(stmt));
 
 	if (mysql_stmt_bind_param(stmt, bind))
 		fterr_errx(1,"mysql_stmt_bind_param(): %s\n", mysql_stmt_error(stmt));
-		
+	
+	if (opt->dblock) {
+		/*Get a write lock. speed up MyISAM inserts, not best for InnoDB*/
+		snprintf(query, sizeof(query), "LOCK TABLE %s WRITE;", db_table);
+		if (mysql_real_query(mysql, query, strlen(query)) != 0) 
+			fterr_warnx("mysql_real_query(): %s", mysql_error(mysql));
+	}
+	
   /* foreach flow */
+	count = 0;
 	while ((rec = ftio_read(ftio))) {
+		
+		fill_rec_all(opt->ft_mask, &flowrec[count], rec, &fo);
 
-		fill_rec_all(opt->ft_mask, &cur, rec, &fo);
+		if (BULK_INSERT_COUNT-1 == count) {
 
-		if (mysql_stmt_execute(stmt)) 
-			fterr_warnx("mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
+			if (mysql_stmt_execute(stmt)) 
+				fterr_errx(1,"mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
 
+			count = 0;
+		}
+		else count++;
 
 		++opt->records;
 
 	} /* while */
 
-  /* close database */
+	/* insert remainder */
+	if (0 < count) {
+	
+		snprintf(query, sizeof(query), "INSERT INTO %s (%s) VALUES ", db_table, fields);
+		for (i=0; i<count; i++) {
+			strncat(query, query_v, sizeof(query) - strlen(query));
+			if (i+1<count) strncat(query, ",", sizeof(query) - strlen(query));
+		}
+
+		if (mysql_stmt_prepare(stmt, query, strlen(query)))
+			fterr_errx(1,"mysql_stmt_prepare(): %s\n", mysql_stmt_error(stmt));
+
+		if (mysql_stmt_bind_param(stmt, bind))
+			fterr_errx(1,"mysql_stmt_bind_param(): %s\n", mysql_stmt_error(stmt));
+
+		if (mysql_stmt_execute(stmt)) 
+			fterr_errx(1,"mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
+
+	}
+
+	if (opt->dblock) {
+		/*Release our locks*/
+		snprintf(query, sizeof(query), "UNLOCK TABLES;");
+		if (mysql_real_query(mysql, query, strlen(query)) != 0) 
+			fterr_warnx("mysql_real_query(): %s", mysql_error(mysql));
+	}
+
+	/* close database */
 	mysql_stmt_close(stmt);
 	mysql_close(mysql);
 
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools

Reply via email to