Hello,
This is a 'preview' patch for PARQ passive queueing. It is meant for
review only. And not for CVS inclusion. Allthough it ran very stable the
last few hours. I am still searching for a segfault I got a few days
ago.
If anyone got a clue what I did wrong, I'd like to hear.
Currently working:
Multiple queues (Depending on the number of upload slots you got):
(Seperate queus for small and large files)
In HTTP reply it sends: ETA, position.
What doesn't:
Doesn't send yet: Queue size and a correct ID.
Send QUEUE when IP changes.
Save queue when host shuts down
Active queueing.
With regards,
Jeroen
Index: src/downloads.h
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/downloads.h,v
retrieving revision 1.64
diff -u -r1.64 downloads.h
--- src/downloads.h 22 Feb 2003 14:43:31 -0000 1.64
+++ src/downloads.h 7 Mar 2003 21:51:51 -0000
@@ -30,8 +30,6 @@
#include "fileinfo.h"
#include "header.h"
-#define PARQ_MAX_ID_LENGTH 40
-
/*
* We keep a list of all the downloads queued per GUID+IP:port (host). Indeed
* some broken clients (e.g. Morpheus) share the same GUID, so we cannot
@@ -75,8 +73,18 @@
guint32 attrs;
};
+/*
+ * PARQ Version information
+ */
+
+#define PARQ_VERSION_MAJOR 1
+#define PARQ_VERSION_MINOR 0
+
+#define PARQ_MAX_ID_LENGTH 40
+
+
/*
- * Download dependent queuing status.
+ * Download dependent queuing status. This will be moved to parq.c eventually
*/
struct dl_queued {
guint position; /* Current position in the queue */
@@ -86,6 +94,11 @@
guint retry_delay; /* Interval between new attempt */
gchar ID[PARQ_MAX_ID_LENGTH+1]; /* PARQ Queue ID, +1 for trailing NUL */
};
+
+/*
+ * End of PARQ declaration
+ */
+
struct download {
gchar error_str[256]; /* Used to sprintf() error strings with vars */
Index: src/gnet.h
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/gnet.h,v
retrieving revision 1.38
diff -u -r1.38 gnet.h
--- src/gnet.h 6 Mar 2003 20:24:42 -0000 1.38
+++ src/gnet.h 7 Mar 2003 21:51:59 -0000
@@ -515,6 +515,9 @@
guint32 bps; /* Current transfer rate */
guint32 avg_bps; /* Average transfer rate */
time_t last_update;
+
+ guint parq_position;
+ guint parq_ETA;
} gnet_upload_status_t;
typedef struct gnet_upload_info {
@@ -545,6 +548,7 @@
#define GTA_UL_WAITING 5 /* Waiting new HTTP request */
#define GTA_UL_ABORTED 6 /* Upload removed during operation */
#define GTA_UL_CLOSED 7 /* Upload removed while waiting */
+#define GTA_UL_QUEUED 8 /* Upload is queued */
/*
* State inspection macros.
Index: src/main.c
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/main.c,v
retrieving revision 1.137
diff -u -r1.137 main.c
--- src/main.c 7 Mar 2003 04:30:51 -0000 1.137
+++ src/main.c 7 Mar 2003 21:52:02 -0000
@@ -54,6 +54,7 @@
#include "move.h"
#include "extensions.h"
#include "inet.h"
+#include "parq.h"
#include "adns.h"
#include "crc.h"
@@ -242,6 +243,7 @@
shell_timer(now);
#endif
download_timer(now); /* Download timeouts */
+ parq_upload_timer(now); /* PARQ upload timeouts/removal */
upload_timer(now); /* Upload timeouts */
}
socket_timer(now); /* Expire inactive sockets */
@@ -396,6 +398,7 @@
share_init();
dmesh_init(); /* Muse be done BEFORE download_init() */
download_init();
+ parq_upload_queue_init();
upload_init();
#ifdef USE_REMOTE_SHELL
shell_init();
Index: src/parq.c
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/parq.c,v
retrieving revision 1.7
diff -u -r1.7 parq.c
--- src/parq.c 1 Mar 2003 09:03:52 -0000 1.7
+++ src/parq.c 7 Mar 2003 21:52:09 -0000
@@ -25,6 +25,7 @@
#include "common.h" /* For -DUSE_DMALLOC */
+#include <glib.h>
#include "parq.h"
#include "ioheader.h"
#include "sockets.h"
@@ -35,6 +36,73 @@
#define PARQ_RETRY_SAFETY 40 /* 40 seconds before lifetime */
#define PARQ_TIMER_BY_POS 30 /* 30 seconds for each queue position */
+#define PARQ_UL_RETRY_DELAY 300 /* 5 minutes timeout. FIXME: Don't set hard! */
+
+#define PARQ_UL_LARGE_SIZE (300*1024*1024)
+ /* 1 ul: 0 < q1
+ * 2 ul: 0 < q1 < 300, 300 < q2 < oo
+ * 3 ul: 0 < q1 < 150, 150 < q2 < 300, 300 < q2 < oo
+ */
+
+guint parq_upload_slots = 0;
+guint parq_max_upload_size = 2000;
+gboolean parq_upload_enabled = TRUE;
+
+static GList *ul_parqs = NULL;
+GHashTable *ul_all_parq_by_IP_and_Name = NULL;
+
+/* Holds status of current queue. */
+struct ul_queue_info {
+ GList *by_Position; /* Queued items sorted on position. Newest is
+ added to the end. */
+
+ gint size; /* Number of entries in current list */
+
+ gboolean active; /* Set to false when the number of upload slots
+ was decreased but the queue still contained
+ queued items. This queue shall be removed when
+ all queued items are finished / removed. */
+ gint active_uploads;
+};
+
+/* Contains the queued upload */
+struct ul_queued {
+ guint position; /* Current position in the queue */
+ guint ETA; /* Expected time in seconds till an upload slot is
+ reached */
+
+ time_t expire; /* Max interval before loosing queue position */
+ time_t enter; /* Time upload entered parq */
+ time_t updated; /* Time last upload request was sent */
+
+ gchar ID[PARQ_MAX_ID_LENGTH]; /* PARQ identifier */
+
+ gchar *IP_and_name;
+
+ guint32 file_size; /* Needed to recalculate ETA */
+
+ struct ul_queue_info *queue; /* In which queue this entry is listed */
+};
+
+static void parq_upload_free(struct ul_queued *parq_ul);
+static struct ul_queued *parq_upload_create(gnutella_upload_t *u);
+static struct ul_queue_info *parq_upload_which_queue(gnutella_upload_t *u);
+static struct ul_queue_info *parq_upload_new_queue();
+static void parq_upload_free_queue(struct ul_queue_info *queue);
+static void parq_upload_update_ETA(struct ul_queue_info *which_ul_queue);
+static struct ul_queued *parq_upload_find(gnutella_upload_t *u);
+static gboolean parq_upload_continue(gnutella_upload_t *u, guint free_slots);
+static void parq_upload_decrease_all_after(struct ul_queued *cur_parq_ul);
+
+/******************************************************************************/
+/******************************************************************************/
+/*** ***/
+/*** Generic non PARQ specific functions ***/
+/*** ***/
+/******************************************************************************/
+/******************************************************************************/
+
+
/*
* get_header_version
*
@@ -57,15 +125,12 @@
gchar *get_header_value(
gchar *const s, gchar const *const attribute, gint *length)
{
- gchar *lowercase_header = s;
+ gchar *header = s;
gchar *end;
gboolean found_right_attribute = FALSE;
gboolean found_equal_sign = FALSE;
- size_t attrlen;
- gchar e;
- gchar b;
- gchar es;
+ size_t attrlen;
g_assert(s != NULL);
g_assert(attribute != NULL);
@@ -79,14 +144,18 @@
*/
do {
- lowercase_header = strcasestr(lowercase_header, attribute);
+ gchar e;
+ gchar b;
+ gchar es;
+
+ header = strcasestr(header, attribute);
- if (lowercase_header == NULL)
+ if (header == NULL)
return NULL;
- e = lowercase_header[attrlen]; /* End char after attribute */
+ e = header[attrlen]; /* End char after attribute */
- if (lowercase_header == s) {
+ if (header == s) {
/*
* This is actually the first value of the header. And it
* started at position '0'. Which is the same as were
@@ -95,7 +164,7 @@
found_right_attribute = e == ' ' || e == '=' || e == '\0';
} else {
- b = *(lowercase_header - 1); /* Character before attribute */
+ b = *(header - 1); /* Character before attribute */
found_right_attribute = (
b == ';' || b == ',' || b == ':' || b == ' '
) && (
@@ -111,7 +180,7 @@
* char is an '='. So we need to move ahead anyway.
*/
- lowercase_header += attrlen;
+ header += attrlen;
if (found_right_attribute) {
@@ -121,14 +190,14 @@
* If we don't, we didn't find a valid attribute.
*/
- es = *lowercase_header;
+ es = *header;
do {
found_right_attribute = es == '=' || es == ' ' || es == '\0';
found_equal_sign = es == '=';
if (!found_equal_sign)
- es = *(++lowercase_header); /* Skip spaces */
+ es = *(++header); /* Skip spaces */
} while (!found_equal_sign && found_right_attribute && es != '\0');
@@ -155,11 +224,11 @@
}
} while (!found_right_attribute);
- g_assert(lowercase_header != NULL);
+ g_assert(header != NULL);
g_assert(found_equal_sign);
- g_assert(*lowercase_header == '=');
+ g_assert(*header == '=');
- lowercase_header++; /* Skip the '=' sign */
+ header++; /* Skip the '=' sign */
/*
* If we need to compute the length of the attribute's value, look for
@@ -169,21 +238,31 @@
if (length != NULL) {
*length = 0;
- end = strchr(lowercase_header, ';'); /* PARQ style */
+ end = strchr(header, ';'); /* PARQ style */
if (end == NULL)
- end = strchr(lowercase_header, ','); /* Active queuing style */
+ end = strchr(header, ','); /* Active queuing style */
/*
* If we couldn't find a delimiter, then this value is the last one.
*/
*length = (end == NULL) ?
- strlen(lowercase_header) : end - lowercase_header;
+ strlen(header) : end - header;
}
- return lowercase_header;
+ return header;
}
+
+/******************************************************************************/
+/******************************************************************************/
+/*** ***/
+/*** The following section contains download PARQ functions ***/
+/*** ***/
+/******************************************************************************/
+/******************************************************************************/
+
+
/*
* parq_download_retry_active_queued
*
@@ -264,9 +343,9 @@
if (!get_header_version(buf, &major, &minor)) {
/*
- * Could not retreive queueing version. It could be 0.1 but there is no
- * way to tell for certain
- */
+ * Could not retreive queueing version. It could be 0.1 but there is
+ * no way to tell for certain
+ */
major = 0;
minor = 1;
}
@@ -275,7 +354,8 @@
d->server->parq_version.minor = minor;
switch (major) {
- case 0: /* Active queueing */
+ case 0: /* Active queueing */
+ g_assert(buf != NULL);
d->queue_status.ID[0] = '\0';
value = get_header_value(buf, "pollMin", NULL);
@@ -297,7 +377,7 @@
}
return FALSE;
}
-
+
value = get_header_value(buf, "lifetime", NULL);
d->queue_status.lifetime = value == NULL ? 0 : get_integer(value);
@@ -401,3 +481,814 @@
d->queue_status.position, d->queue_status.ID);
}
+
+/******************************************************************************/
+/******************************************************************************/
+/*** ***/
+/*** The following section contains upload queueing ***/
+/*** ***/
+/******************************************************************************/
+/******************************************************************************/
+
+/*
+ * parq_upload_free
+ *
+ * removes an parq_ul from the parq list and frees all its memory
+ */
+static void parq_upload_free(struct ul_queued *parq_ul)
+{
+ g_assert(parq_ul != NULL);
+ g_assert(parq_ul->IP_and_name != NULL);
+ g_assert(parq_ul->queue != NULL);
+ g_assert(parq_ul->queue->size > 0);
+ g_assert(parq_ul->queue->by_Position != NULL);
+
+ /* Remove the current queued item from all lists */
+ parq_ul->queue->by_Position =
+ g_list_remove(parq_ul->queue->by_Position, parq_ul);
+
+ /* Do not allow empty lists */
+ if (g_list_length(parq_ul->queue->by_Position) - 1 == 0) {
+ g_list_free(parq_ul->queue->by_Position);
+ parq_ul->queue->by_Position = NULL;
+ }
+
+ g_hash_table_remove(ul_all_parq_by_IP_and_Name, parq_ul->IP_and_name);
+
+ /*
+ * Queued upload is now removed from all lists. So queue size can be
+ * safely decreased and new ETAs can be calculate.
+ */
+ parq_ul->queue->size--;
+
+ parq_upload_update_ETA(parq_ul->queue);
+
+ /*
+ * If the current queue is not active anymore (ie it should be removed
+ * as soon as the queue is empty) and there are no more queued items
+ * in the queue, remove the queue
+ */
+ if (!parq_ul->queue->active && parq_ul->queue->size == 0) {
+ parq_upload_free_queue(parq_ul->queue);
+ }
+
+ /* Free the memory used by the current queued item */
+ g_free(parq_ul->IP_and_name);
+ parq_ul->IP_and_name = NULL;
+
+ wfree(parq_ul, sizeof(*parq_ul));
+ parq_ul = NULL;
+}
+
+/*
+ * parq_upload_create
+ *
+ * Creates a new upload structure and prefills some values. Returns a pointer to
+ * the newly created ul_queued structure.
+ */
+static struct ul_queued *parq_upload_create(gnutella_upload_t *u)
+{
+ time_t now = time((time_t *) NULL);
+ struct ul_queued *parq_ul = NULL;
+ struct ul_queued *parq_ul_prev = NULL;
+ struct ul_queue_info *parq_ul_queue = NULL;
+
+ guint rw = 0;
+ guint ETA = 0;
+ gchar buf[1024];
+ GList *l;
+
+ g_assert(u != NULL);
+ g_assert(ul_all_parq_by_IP_and_Name != NULL);
+
+ parq_ul_queue = parq_upload_which_queue(u);
+ g_assert(parq_ul_queue != NULL);
+
+ /* Locate the previous queued item so we can calculate the ETA */
+ l = g_list_last(parq_ul_queue->by_Position);
+ if (l != NULL)
+ parq_ul_prev = (struct ul_queued *) l->data;
+
+ if (parq_ul_prev != NULL) {
+ ETA = parq_ul_prev->ETA;
+ if (bw_http_out != 0 && bws_out_enabled) {
+ ETA += parq_ul_prev->file_size / bw_http_out;
+ } else {
+ printf("PARQ UL Q %d/%d: Could not calculate ETA\r\n",
+ g_list_position(ul_parqs,
+ g_list_find(ul_parqs, parq_ul_prev->queue)),
+ g_list_length(ul_parqs) - 1);
+
+ // FIXME
+ /* Pessimistic: 1 bytes / sec */
+ ETA += parq_ul_prev->file_size;
+ }
+ }
+
+ /* Create new parq_upload item */
+ parq_ul = walloc(sizeof(*parq_ul));
+ g_assert(parq_ul != NULL);
+
+ /* Create identifier to find upload again later. IP + Filename */
+ rw = gm_snprintf(buf, sizeof(buf), "%d %s", u->ip, u->name);
+
+ /* Allocate memory to save IP and name string + NULL character*/
+ parq_ul->IP_and_name = g_malloc(strlen(buf) + sizeof('\0'));
+ g_assert(parq_ul->IP_and_name != NULL);
+
+ /* Fill parq_ul structure */
+ parq_ul->position = ++parq_ul_queue->size;
+ parq_ul->ETA = ETA;
+ parq_ul->enter = now;
+ parq_ul->updated = now;
+ parq_ul->expire = now + PARQ_UL_RETRY_DELAY;
+ parq_ul->file_size = u->file_size;
+ parq_ul->queue = parq_ul_queue;
+
+ strcpy(parq_ul->IP_and_name, buf);
+ strcpy(parq_ul->ID, "NA");
+
+ /* Save into hash table so we can find the current parq ul later */
+ g_hash_table_insert(ul_all_parq_by_IP_and_Name, parq_ul->IP_and_name,
+ parq_ul);
+
+ parq_ul_queue->by_Position =
+ g_list_append(parq_ul_queue->by_Position, parq_ul);
+
+ g_assert(parq_ul != NULL);
+ g_assert(parq_ul->position > 0);
+ g_assert(parq_ul->ID != NULL);
+ g_assert(parq_ul->IP_and_name != NULL);
+ g_assert(parq_ul->queue != NULL);
+ g_assert(parq_ul->queue->by_Position != NULL);
+ g_assert(parq_ul->queue->by_Position->data != NULL);
+
+ return parq_ul;
+}
+
+/*
+ * parq_upload_which_queue
+ *
+ * Looks up in which queue the current upload should be placed and if the queue
+ * doesn't exist yet it will be created.
+ * Returns a pointer to the queue in which the upload should be queued.
+ */
+static struct ul_queue_info *parq_upload_which_queue(gnutella_upload_t *u)
+{
+ struct ul_queue_info *queue;
+ guint size = 0;
+ guint slot = 0;
+
+ size = PARQ_UL_LARGE_SIZE;
+
+ /*
+ * Determine in which queue the upload should be placed. Upload queues:
+ * 300 < size < oo
+ * 150 < size < 300
+ * 75 < size < 150
+ * 0 < size < 75
+ * Smallest: PARQ_UL_LARGE_SIZE / 2^(parq_upload_slots-1)
+ */
+
+ for(slot = 1 ; slot <= parq_upload_slots; slot++) {
+ if (u->file_size > size || slot >= parq_upload_slots)
+ break;
+ size = size / 2;
+ }
+
+ while (g_list_length(ul_parqs) < parq_upload_slots) {
+ queue = parq_upload_new_queue();
+ }
+
+ queue = (struct ul_queue_info *) g_list_nth_data(ul_parqs, slot - 1);
+
+ g_assert(queue != NULL);
+ g_assert(queue->active == TRUE);
+
+ return queue;
+}
+
+/*
+ * parq_upload_new_queue
+ *
+ * Creates a new ul_queue_info structure and places it in the ul_parqs
+ * linked list.
+ */
+static struct ul_queue_info *parq_upload_new_queue()
+{
+ struct ul_queue_info *queue = NULL;
+
+ queue = walloc(sizeof(*queue));
+ g_assert(queue != NULL);
+
+ queue->size = 0;
+ queue->active = TRUE;
+ queue->by_Position = NULL;
+ queue->active_uploads = 0;
+
+ // FIXME: Should be sorted by 'filesize' ?
+ ul_parqs = g_list_append(ul_parqs, queue);
+
+// if (dbg)
+ printf("PARQ UL: Created new queue %d\r\n",
+ g_list_position(ul_parqs, g_list_find(ul_parqs, queue)) + 1);
+
+ g_assert(ul_parqs != NULL);
+ g_assert(ul_parqs->data != NULL);
+ g_assert(queue != NULL);
+
+ return queue;
+}
+
+/*
+ * parq_upload_free_queue
+ *
+ * Frees the queue from memory and the ul_parqs linked list
+ */
+static void parq_upload_free_queue(struct ul_queue_info *queue)
+{
+ g_assert(queue != NULL);
+ g_assert(ul_parqs != NULL);
+
+ /* Never ever remove a queue which is in use and/or marked as active */
+ g_assert(queue->size == 0);
+ g_assert(queue->active_uploads == 0);
+ g_assert(queue->active == FALSE);
+
+// if (dbg)
+ printf("PARQ UL: Removing inactive queue %d\r\b",
+ g_list_position(ul_parqs, g_list_find(ul_parqs, queue)) + 1);
+
+ /* Remove queue from all lists */
+ ul_parqs = g_list_remove(ul_parqs, queue);
+
+ /* Free memory */
+ wfree(queue, sizeof(*queue));
+ queue = NULL;
+}
+
+/*
+ * parq_upload_update_ETA
+ *
+ * Updates the ETA of all queued items in the given queue
+ */
+static void parq_upload_update_ETA(struct ul_queue_info *which_ul_queue)
+{
+ GList *l;
+ guint ETA = 0;
+
+ /* Cycle through the current queue linked list */
+ for (l = which_ul_queue->by_Position; l; l = g_list_next(l)) {
+ struct ul_queued *parq_ul = (struct ul_queued *) l->data;
+
+ g_assert(parq_ul != NULL);
+
+ parq_ul->ETA = ETA;
+
+ /* Recalculate ETA */
+ if (bw_http_out != 0 && bws_out_enabled) {
+ ETA += parq_ul->file_size / bw_http_out;
+ } else {
+ // FIXME, should use average bandwith here
+ /* Pessimistic: 1 bytes / sec */
+ ETA += parq_ul->file_size;
+ }
+ }
+}
+
+/*
+ * parq_upload_find
+ *
+ * Finds an upload if available in the upload queue.
+ * returns NULL if upload could not be found.
+ */
+static struct ul_queued *parq_upload_find(gnutella_upload_t *u)
+{
+ guint rw;
+ gchar buf[1024];
+
+ g_assert(u != NULL);
+ g_assert(ul_all_parq_by_IP_and_Name != NULL);
+
+ rw = gm_snprintf(buf, sizeof(buf),
+ "%d %s", u->ip, u->name);
+
+ return g_hash_table_lookup(ul_all_parq_by_IP_and_Name, buf);
+}
+
+/*
+ * parq_upload_queue_init
+ *
+ * Initialises the upload queue for PARQ.
+ */
+void parq_upload_queue_init()
+{
+ ul_all_parq_by_IP_and_Name = g_hash_table_new(g_str_hash, g_str_equal);
+
+ (void) parq_upload_new_queue();
+
+ g_assert(ul_all_parq_by_IP_and_Name != NULL);
+}
+
+/*
+ * parq_upload_timer
+ *
+ * Removes any PARQ uploads which show no activity.
+ */
+void parq_upload_timer(time_t now)
+{
+ GList *queues;
+ GList *dl;
+ GSList *sl;
+ GSList *remove = NULL;
+ static guint print_q_size = 0;
+
+ for (queues = ul_parqs ; queues != NULL; queues = queues->next) {
+ struct ul_queue_info *queue = (struct ul_queue_info *) queues->data;
+
+ for (dl = queue->by_Position; dl != NULL; dl = dl->next) {
+ struct ul_queued *parq_ul = (struct ul_queued *) dl->data;
+
+ if (parq_ul == NULL)
+ break;
+
+ if (parq_ul->expire < now && parq_ul->position > 0) {
+// if (dbg)
+ printf("PARQ UL Q %d/%d (%3d/%3d): Timeout:'%s'\n\r",
+ g_list_position(ul_parqs,
+ g_list_find(ul_parqs, parq_ul->queue)) + 1,
+ g_list_length(ul_parqs),
+ parq_ul->position,
+ parq_ul->queue->size,
+ parq_ul->IP_and_name);
+ /*
+ * Mark for removal. Can't remove now as we are still using the
+ * ul_parq_by_Position linked list. (prepend is probably the
+ * fastest function
+ */
+ remove = g_slist_prepend(remove, parq_ul);
+ }
+ }
+ }
+
+ for (sl = remove; sl != NULL; sl = sl->next) {
+ struct ul_queued *parq_ul = (struct ul_queued *) sl->data;
+
+ /* Move uploads after the current upload up one slot */
+ parq_upload_decrease_all_after(parq_ul);
+
+ parq_upload_free(parq_ul);
+ }
+
+ g_slist_free(remove);
+
+ /* Dump queue info every 10 seconds to the console */
+ if (print_q_size++ >= 10) {
+ print_q_size = 0;
+
+ printf(">>>>>\r\n");
+
+ for (queues = ul_parqs ; queues != NULL; queues = queues->next) {
+ struct ul_queue_info *queue = (struct ul_queue_info *) queues->data;
+
+ printf("PARQ UL: Queue %d/%d contains %d items, "
+ "%d uploading, queue is marked %s \r\n",
+ g_list_position(ul_parqs, g_list_find(ul_parqs, queue)) + 1,
+ g_list_length(ul_parqs),
+ queue->size,
+ queue->active_uploads,
+ queue->active ? "active" : "inactive");
+ }
+
+ printf("<<<<<\r\n");
+ }
+}
+
+/*
+ * parq_upload_queue_full
+ *
+ * Returns true if parq cannot hold any more uploads
+ */
+gboolean parq_upload_queue_full(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ if (!parq_upload_enabled)
+ return TRUE;
+
+ parq_ul = parq_upload_find(u);
+
+ if (parq_ul == NULL)
+ return FALSE;
+ else
+ return parq_ul->queue->size >= parq_max_upload_size;
+}
+
+/*
+ * parq_upload_queued
+ *
+ * Wether the current upload is already queued.
+ */
+gboolean parq_upload_queued(gnutella_upload_t *u)
+{
+ if (!parq_upload_enabled)
+ return FALSE;
+
+ return parq_upload_lookup_position(u) != (guint) -1;
+}
+
+/*
+ * parq_upload_continue
+ *
+ * Returns true if the current upload is allowed to get an upload slot.
+ */
+static gboolean parq_upload_continue(gnutella_upload_t *u, guint free_slots)
+{
+ guint position = 0;
+ GList *l = NULL;
+ guint needed = 0;
+ struct ul_queued *parq_ul = NULL;
+
+ /*
+ * If there are no free upload slots the queued upload isn't allowed an
+ * upload slot anyway. So we might just as well abort here
+ */
+ if (free_slots == 0)
+ return FALSE;
+
+ position = parq_upload_lookup_position(u);
+
+ /*
+ * If the current queued position is too large to receive an upload slot
+ * we don't need to continue.
+ */
+ if (position > free_slots)
+ return FALSE;
+
+ parq_ul = parq_upload_find(u);
+ g_assert(parq_ul != NULL);
+
+ /*
+ * FIXME: If the number of upload slots have been decreased, an old queue
+ * FIXME: may still exist. What to do with those uploads? Should we make
+ * FIXME: sure those uploads are served first? Those uploads should take
+ * FIXME: less time too upload anyway, as they _must_ be smaller.
+ * FIXME
+ * FIXME: Something like this?:
+ *
+ l = g_list_last(ul_parqs);
+ {
+ struct ul_queue_info *queue = (struct ul_queue_info *) l->data;
+ if (!queue->active) {
+ if (parq_ul->queue->active)
+ return FALSE;
+ }
+ }
+ */
+
+ /*
+ * If the current position = 1 and the current queue hasn't already an item
+ * uploading than the upload is allowed by default.
+ */
+ if (position == 1 && parq_ul->queue->active_uploads == 0)
+ return TRUE;
+
+ /* If position is zero the download is already downloading */
+ g_assert(position != 0);
+
+ /* See wether the other queues are allowed to get an upload position */
+ for (l = ul_parqs; l ; l = l->next) {
+ struct ul_queue_info *queue = (struct ul_queue_info *) l->data;
+
+ printf("PARQ UL: Q %d/%d Active: %d\n\r",
+ g_list_position(ul_parqs, g_list_find(ul_parqs, queue)) + 1,
+ g_list_length(ul_parqs),
+ queue->active_uploads);
+
+ if (g_list_length(queue->by_Position) > 1 &&
+ queue->active_uploads == 0)
+ needed++;
+ }
+
+ return needed < free_slots;
+}
+
+/*
+ * parq_upload_request
+ *
+ * Looks up an upload in the queue, to see if it is already queued, if it isn't
+ * it will be added.
+ * If the download may continue, true is returned. False otherwise (which
+ * probably means the upload is queued).
+ */
+gboolean parq_upload_request(gnutella_upload_t *u, header_t *header,
+ guint upload_slots, guint used_slots)
+{
+ struct ul_queued *parq_ul = NULL;
+ time_t now = time((time_t *) NULL);
+
+ g_assert(u != NULL);
+ g_assert(header != NULL);
+
+ /*
+ * If parqueueing is disabled. The upload is allowed to continue when there
+ * is a free upload slot.
+ */
+ if (!parq_upload_enabled)
+ return used_slots < upload_slots;
+
+ parq_upload_slots = upload_slots;
+
+ parq_ul = parq_upload_find(u);
+
+ if (parq_ul == NULL) {
+
+ /*
+ * Current upload is not queued yet. If the queue isn't full yet, always
+ * add the upload in the queue.
+ */
+
+ if (parq_upload_queue_full(u))
+ return FALSE;
+
+ parq_ul = parq_upload_create(u);
+
+ g_assert(parq_ul != NULL);
+
+// if (dbg)
+ printf("PARQ UL Q %d/%d (%3d/%3d) ETA: %s Added: '%s'\r\n",
+ g_list_position(ul_parqs,
+ g_list_find(ul_parqs, parq_ul->queue)) + 1,
+ g_list_length(ul_parqs),
+ parq_ul->position,
+ parq_ul->queue->size,
+ short_time(parq_upload_lookup_ETA(u)),
+ parq_ul->IP_and_name);
+ }
+ g_assert(parq_ul != NULL);
+
+ parq_ul->updated = now;
+ parq_ul->expire = now + PARQ_UL_RETRY_DELAY;
+
+ /*
+ * Client was already downloading a segment, segment was finished and
+ * just did a follow up request.
+ */
+ if (parq_ul->position == 0) {
+ return TRUE;
+ }
+
+ /*
+ * Check wether the current upload is allowed to get an upload slot. If so
+ * move other queued items after the current item up one position in the
+ * queue
+ */
+ if (parq_upload_continue(u, upload_slots - used_slots)) {
+ return TRUE;
+ } else {
+ u->parq_status = TRUE;
+ return FALSE;
+ }
+}
+
+void parq_upload_busy(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ u->parq_status = 0;
+
+ parq_ul = parq_upload_find(u);
+
+ g_assert(parq_ul != NULL);
+
+ if (parq_ul->position == 0)
+ return;
+
+ parq_ul->queue->active_uploads++;
+ parq_ul->position = 0; // Mark as uploading
+ parq_upload_decrease_all_after(parq_ul);
+}
+
+void parq_upload_add(gnutella_upload_t *u)
+{
+ /*
+ * Cosmetic. Not used at the moment. gnutella_upload_t structure probably
+ * isn't complete yet at this moment
+ */
+}
+
+/*
+ * parq_upload_remove
+ *
+ * When an upload is removed this function should be called so parq
+ * knows the current upload status of an upload.
+ */
+void parq_upload_remove(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ g_assert(u != NULL);
+
+ /*
+ * Avoid removing an upload which is being removed because we are returning
+ * a busy (503), in which case the upload got queued
+ */
+
+ if (u->parq_status) {
+ u->parq_status = 0;
+ return;
+ }
+
+ parq_ul = parq_upload_find(u);
+
+ /* If parq_ul = NULL, than the upload didn't get a slot in the PARQ. */
+ if (parq_ul == NULL)
+ return;
+
+ if (parq_ul->position == 0 && u->keep_alive && u->status == GTA_UL_WAITING) {
+ printf("**** PARQ UL Q %d/%d: Not removed, waiting for new request\r\n",
+ g_list_position(ul_parqs,
+ g_list_find(ul_parqs, parq_ul->queue)) + 1,
+ g_list_length(ul_parqs));
+ return;
+ }
+
+ printf("**** Status: %d, keep alive: %d '%s'\r\n", u->status, u->keep_alive, u->name);
+// if (dbg)
+ printf("PARQ UL Q %d/%d: Upload finished or removed from uploads\r\n",
+ g_list_position(ul_parqs,
+ g_list_find(ul_parqs, parq_ul->queue)) + 1,
+ g_list_length(ul_parqs));
+
+
+ if (parq_ul->position == 0)
+ parq_ul->queue->active_uploads--;
+ else
+ parq_upload_decrease_all_after(parq_ul);
+
+ g_assert(parq_ul->queue->active_uploads >= 0);
+
+ parq_upload_free(parq_ul);
+}
+
+/*
+ * parq_upload_add_header
+ *
+ * Adds X-Queued status to an queud upload in the HTTP header.
+ */
+void parq_upload_add_header(gchar *buf, gint *retval, gpointer arg)
+{
+ gint rw = 0;
+ gint length = *retval;
+ gchar lbuf[1024];
+ struct upload_http_cb *a = (struct upload_http_cb *) arg;
+
+ g_assert(buf != NULL);
+ g_assert(retval != NULL);
+ g_assert(a->u != NULL);
+
+ if (!parq_upload_enabled)
+ return;
+
+ if (parq_upload_queued(a->u)) {
+ guint neededlength = gm_snprintf(lbuf, length,
+ "X-Queue: %d.%d\r\n"
+ "X-Queued: position=%d; ID=%s; ETA=%d\r\n",
+ PARQ_VERSION_MAJOR, PARQ_VERSION_MINOR,
+ parq_upload_lookup_position(a->u),
+ parq_upload_lookup_id(a->u),
+ parq_upload_lookup_ETA(a->u));
+
+ if (neededlength < length) {
+ rw = gm_snprintf(lbuf, length,
+ "X-Queue: %d.%d\r\n"
+ "X-Queued: position=%d; ID=%s; ETA=%d\r\n",
+ PARQ_VERSION_MAJOR, PARQ_VERSION_MINOR,
+ parq_upload_lookup_position(a->u),
+ parq_upload_lookup_id(a->u),
+ parq_upload_lookup_ETA(a->u));
+ }
+ }
+
+ *retval = rw;
+}
+
+/*
+ * parq_upload_lookup_position
+ *
+ * Returns the current queueing position of an upload. Returns a value of
+ * (guint) -1 if not found.
+ */
+guint parq_upload_lookup_position(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ g_assert(u != NULL);
+
+ parq_ul = parq_upload_find(u);
+
+ if (parq_ul != NULL) {
+ return parq_ul->position;
+ } else {
+ return (guint) -1;
+ }
+}
+
+/*
+ * parq_upload_lookup_id
+ *
+ * Returns the current ID of the upload.
+ */
+gchar* parq_upload_lookup_id(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ g_assert(u != NULL);
+
+ parq_ul = parq_upload_find(u);
+
+ if ( parq_ul != NULL)
+ return parq_ul->ID;
+ else
+ return NULL;
+}
+
+/*
+ * parq_upload_lookup_ETA
+ *
+ * Returns the Estimated Time of Arrival for an upload slot for a given upload.
+ */
+guint parq_upload_lookup_ETA(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul;
+
+ parq_ul = parq_upload_find(u);
+
+ /* If parq_ul == NULL the current upload isn't queued and ETA is unknown */
+ if (parq_ul != NULL)
+ return parq_ul->ETA;
+ else
+ return (guint) -1;
+}
+
+/*
+ * parq_upload_queue_size
+ *
+ * Returns the current upload queue size.
+ */
+guint parq_upload_queue_size(gnutella_upload_t *u)
+{
+ struct ul_queued *parq_ul = NULL;
+
+ /*
+ * There can be multiple queues. Find the queue in which the upload is
+ * queued.
+ */
+
+ parq_ul = parq_upload_find(u);
+
+ if (parq_ul != NULL) {
+ g_assert(parq_ul->queue != NULL);
+ g_assert(parq_max_upload_size > parq_ul->queue->size);
+
+ return parq_ul->queue->size;
+ } else {
+ /* No queue created yet */
+ return 0;
+ }
+}
+
+/* parq_upload_decrease_all_after
+ *
+ * Decreases the position of all queued items after the given queued item.
+ */
+static void parq_upload_decrease_all_after(struct ul_queued *cur_parq_ul)
+{
+ GList *l;
+
+ g_assert(cur_parq_ul != NULL);
+ g_assert(cur_parq_ul->queue != NULL);
+ g_assert(cur_parq_ul->queue->by_Position != NULL);
+ g_assert(cur_parq_ul->queue->size > 0);
+
+ l = g_list_find(cur_parq_ul->queue->by_Position, cur_parq_ul);
+ l = g_list_next(l); /* Decrease _after_ current parq */
+
+ /*
+ * Cycle through list and decrease all positions by one. Position should
+ * never reach 0 which would mean the queued item is currently uploading
+ */
+ for (; l; l = g_list_next(l)) {
+ struct ul_queued *parq_ul = (struct ul_queued *) l->data;
+
+ g_assert(parq_ul != NULL);
+
+ parq_ul->position--;
+
+ g_assert(parq_ul->position != 0);
+ }
+}
+
+/*
+# vim:ts=4:sw=4
+*/
Index: src/parq.h
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/parq.h,v
retrieving revision 1.2
diff -u -r1.2 parq.h
--- src/parq.h 2 Feb 2003 23:28:11 -0000 1.2
+++ src/parq.h 7 Mar 2003 21:52:10 -0000
@@ -28,13 +28,7 @@
#include "header.h"
#include "downloads.h"
-
-/*
- * PARQ Version information
- */
-
-#define PARQ_VERSION_MAJOR 1
-#define PARQ_VERSION_MINOR 0
+#include "uploads.h"
/*
* Public interface.
@@ -45,7 +39,22 @@
gboolean parq_download_parse_queue_status(struct download *d, header_t *header);
gboolean parq_download_is_active_queued(struct download *d);
void parq_download_add_header(
- gchar *buf, gint len, gint *rw, struct download *d);
+gchar *buf, gint len, gint *rw, struct download *d);
+void parq_upload_queue_init();
+void parq_upload_timer(time_t now);
+void parq_upload_add_header(gchar *buf, gint *retval, gpointer arg);
+gboolean parq_upload_request(gnutella_upload_t *u, header_t *header,
+ guint upload_slots, guint used_slots);
+guint parq_upload_lookup_position(gnutella_upload_t *u);
+gchar* parq_upload_lookup_id(gnutella_upload_t *u);
+guint parq_upload_lookup_ETA(gnutella_upload_t *u);
+gboolean parq_upload_queue_full(gnutella_upload_t *u);
+guint parq_upload_queue_size(gnutella_upload_t *u);
+gboolean parq_upload_queued(gnutella_upload_t *u);
+void parq_upload_remove(gnutella_upload_t *u);
+void parq_upload_add(gnutella_upload_t *u);
+void parq_upload_busy(gnutella_upload_t *u);
+
#endif /* _parq_h_ */
Index: src/uploads.c
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/uploads.c,v
retrieving revision 1.115
diff -u -r1.115 uploads.c
--- src/uploads.c 7 Mar 2003 02:46:15 -0000 1.115
+++ src/uploads.c 7 Mar 2003 21:52:26 -0000
@@ -49,6 +49,7 @@
#include "nodes.h"
#include "ioheader.h"
#include "ban.h"
+#include "parq.h"
#include "settings.h"
@@ -75,16 +76,6 @@
guint32 count_uploads = 0;
/*
- * This structure is used for HTTP status printing callbacks.
- */
-struct upload_http_cb {
- gnutella_upload_t *u; /* Upload being ACK'ed */
- time_t now; /* Current time */
- time_t mtime; /* File modification time */
- struct shared_file *sf;
-};
-
-/*
* This structure is the key used in the mesh_info hash table to record
* when we last sent mesh information to some IP about a given file
* (identified by its SHA1).
@@ -199,7 +190,6 @@
if (UPLOAD_IS_COMPLETE(u))
continue; /* Complete, no timeout possible */
-
/*
* Check for timeouts.
*/
@@ -249,7 +239,7 @@
u->status = push ? GTA_UL_PUSH_RECEIVED : GTA_UL_HEADERS;
u->last_update = time((time_t *) 0);
u->file_desc = -1;
-
+
/*
* Record pending upload in the GUI.
*/
@@ -267,6 +257,8 @@
* Add upload to the GUI
*/
upload_fire_upload_added(u);
+
+ u->parq_status = 0;
return u;
}
@@ -537,6 +529,19 @@
}
/*
+ * If the download got queued, also add the queueing information
+ * --JA, 07/02/2003
+ */
+ if (parq_upload_queued(u)) {
+ cb_arg.u = u;
+ cb_arg.sf = sf;
+
+ hev[hevcnt].he_type = HTTP_EXTRA_CALLBACK;
+ hev[hevcnt].he_cb = parq_upload_add_header;
+ hev[hevcnt++].he_arg = &cb_arg;
+ }
+
+ /*
* If this is a pushed upload, and we are not firewalled, then tell
* them they can reach us directly by outputting an X-Host line.
*/
@@ -560,7 +565,7 @@
hev[hevcnt].he_cb = upload_http_sha1_add;
hev[hevcnt++].he_arg = &cb_arg;
}
-
+
http_send_status(u->socket, code, hevcnt ? hev : NULL, hevcnt, reason);
u->error_sent = code;
@@ -596,6 +601,12 @@
gchar *logreason;
gchar errbuf[1024];
+ /*
+ * Signal PARQ about a upload which is possibly ready.
+ * -- JA, 06/03/'03
+ */
+ gint previous_running = running_uploads;
+
if (reason) {
gm_vsnprintf(errbuf, sizeof(errbuf), reason, ap);
errbuf[sizeof(errbuf) - 1] = '\0'; /* May be truncated */
@@ -657,11 +668,12 @@
if (!UPLOAD_IS_COMPLETE(u))
registered_uploads--;
- if (!UPLOAD_IS_COMPLETE(u) && !UPLOAD_IS_CONNECTING(u))
+ if (!UPLOAD_IS_COMPLETE(u) && !UPLOAD_IS_CONNECTING(u)) {
running_uploads--;
- else if (u->keep_alive && UPLOAD_IS_CONNECTING(u))
+ } else if (u->keep_alive && UPLOAD_IS_CONNECTING(u)) {
running_uploads--;
-
+ }
+
/*
* If we were sending data, and we have not accounted the download yet,
* then update the stats, not marking the upload as completed.
@@ -678,6 +690,10 @@
upload_fire_upload_info_changed(u);
}
+
+ if (previous_running != running_uploads)
+ parq_upload_remove(u);
+
upload_fire_upload_removed(u, reason ? errbuf : NULL);
upload_free_resources(u);
@@ -2001,24 +2017,32 @@
* wait for a download slot. It would be a pity for them to get
* a slot and be told about the mismatch only then.
* --RAM, 15/12/2001
- */
-
- if (running_uploads > max_uploads) {
-
+ *
+ * Althought the uploads slots are full, we could try to queue
+ * the download in PARQ. If this also fails, than the requesting client
+ * is out of luck.
+ * --JA, 05/02/2003
+ *
+ */
+ if (!parq_upload_request(u, header, max_uploads, running_uploads - 1)) {
+// running_uploads > max_uploads ||
+// !parq_upload_continue(u, max_uploads - running_uploads + 1)
+// ) {
/*
- * Support for bandwith-dependent number of upload slots.
- * The upload bandwith limitation has to be enabled, otherwise
- * we can not be sure that we have reasonable values for the
- * outgoing bandwith set.
- * --TF 30/05/2002
- *
- * NB: if max_uploads is 0, then we disable sharing, period.
- *
- * Require that BOTH the average and "instantaneous" usage be
- * lower than the minimum to trigger the override. This will
- * make it more robust when bandwidth stealing is enabled.
- * --RAM, 27/01/2003
- */
+ * Support for bandwith-dependent number of upload slots.
+ * The upload bandwith limitation has to be enabled, otherwise
+ * we can not be sure that we have reasonable values for the
+ * outgoing bandwith set.
+ * --TF 30/05/2002
+ *
+ * NB: if max_uploads is 0, then we disable sharing, period.
+ *
+ * Require that BOTH the average and "instantaneous" usage be
+ * lower than the minimum to trigger the override. This will
+ * make it more robust when bandwidth stealing is enabled.
+ * --RAM, 27/01/2003
+ *
+ */
if (
max_uploads &&
@@ -2027,18 +2051,34 @@
bsched_pct(bws.out) < ul_usage_min_percentage &&
bsched_avg_pct(bws.out) < ul_usage_min_percentage
) {
+ /*
+ * PARQ FIXME: This is probably going to give problems with
+ * PARQ
+ * -- JA 5/03/2003
+ */
if (dbg > 4)
- printf("Overriden slot limit because u/l b/w used at %d%% "
- "(minimum set to %d%%)\n",
+ printf("Overriden slot limit because u/l b/w used at "
+ "%d%% (minimum set to %d%%)\n",
bsched_avg_pct(bws.out), ul_usage_min_percentage);
- } else {
- upload_error_remove(u, reqfile,
- 503, "Too many uploads (%d max)", max_uploads);
+ } else {
+ if (parq_upload_queue_full(u)) {
+ upload_error_remove(u, reqfile,
+ 503, "Too many uploads (%d max) Queue full",
+ max_uploads);
+ } else {
+ upload_error_remove(u, reqfile, 503,
+ "Too many uploads (%d max) Queued at: %d, ETA: %s",
+ max_uploads,
+ parq_upload_lookup_position(u),
+ short_time(parq_upload_lookup_ETA(u)));
+ }
return;
}
}
}
+ parq_upload_busy(u);
+
/*
* Do we have to keep the connection after this request?
*/
@@ -2315,6 +2355,7 @@
} else {
registered_uploads--;
running_uploads--;
+ parq_upload_remove(u);
}
upload_remove(u, NULL);
@@ -2415,6 +2456,14 @@
si->avg_bps = 1;
si->last_update = u->last_update;
+ /*
+ * PARQ cached information.
+ * -- JA, 06/02/2003
+ */
+// si->parq_position = parq_upload_lookup_position(u),
+// si->parq_ETA = parq_upload_lookup_ETA(u);
+
+
if (u->bio) {
si->bps = bio_bps(u->bio);
si->avg_bps = bio_avg_bps(u->bio);
@@ -2424,5 +2473,8 @@
si->avg_bps = (u->pos - u->skip) / (u->last_update - u->start_date);
if (si->avg_bps == 0)
si->avg_bps++;
+
+
+
}
Index: src/uploads.h
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/uploads.h,v
retrieving revision 1.25
diff -u -r1.25 uploads.h
--- src/uploads.h 7 Mar 2003 02:42:55 -0000 1.25
+++ src/uploads.h 7 Mar 2003 21:52:26 -0000
@@ -32,7 +32,7 @@
#include "bsched.h"
struct gnutella_node;
-
+
typedef struct upload {
gnet_upload_t upload_handle;
@@ -71,7 +71,19 @@
gboolean keep_alive; /* Keep HTTP connection? */
gboolean push;
gboolean accounted; /* True when upload was accounted for */
+
+ gboolean parq_status;
} gnutella_upload_t;
+
+/*
+ * This structure is used for HTTP status printing callbacks.
+ */
+struct upload_http_cb {
+ gnutella_upload_t *u; /* Upload being ACK'ed */
+ time_t now; /* Current time */
+ time_t mtime; /* File modification time */
+ struct shared_file *sf;
+};
/*
* Global Data
Index: src/uploads_gui.c
===================================================================
RCS file: /cvsroot/gtk-gnutella/gtk-gnutella-current/src/uploads_gui.c,v
retrieving revision 1.14
diff -u -r1.14 uploads_gui.c
--- src/uploads_gui.c 7 Mar 2003 03:20:44 -0000 1.14
+++ src/uploads_gui.c 7 Mar 2003 21:52:29 -0000
@@ -178,6 +178,19 @@
return "No output yet..."; /* Never wrote anything yet */
switch(u->status) {
+ /*
+ * Status: GTA_UL_QUEUED. When PARQ is enabled, and all upload slots are
+ * full an upload is placed into the PARQ-upload. We probably want to
+ * display this information
+ * -- JA, 06/02/2003
+ */
+ case GTA_UL_QUEUED:
+ gm_snprintf(tmpstr, sizeof(tmpstr),
+ "Queued (slot %d / %d) ETA: %ds",
+ u->parq_position,
+ parq_upload_queue_size(),
+ u->parq_ETA);
+ break;
case GTA_UL_ABORTED:
return "Transmission aborted";
case GTA_UL_CLOSED: