* Yang Hongyang (yan...@cn.fujitsu.com) wrote:
> This filter is to buffer/release packets, this feature can be used
> when using MicroCheckpointing, or other Remus like VM FT solutions, you
> can also use it to simulate the network delay.
> It has an interval option, if supplied, this filter will release
> packets by interval.
> 
> Usage:
>  -netdev tap,id=bn0
>  -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000
> 

Can I just check I understand what the 'interval' does - is that
you release one packet from the filter every 'interval' us ?

Is there anything that limits the size of the filter?

Dave

> NOTE:
>  the scale of interval is microsecond.
> 
> Signed-off-by: Yang Hongyang <yan...@cn.fujitsu.com>
> ---
>  net/Makefile.objs   |   1 +
>  net/filter-buffer.c | 160 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  net/filter.c        |   2 +
>  net/filters.h       |  17 ++++++
>  qapi-schema.json    |  18 +++++-
>  5 files changed, 197 insertions(+), 1 deletion(-)
>  create mode 100644 net/filter-buffer.c
>  create mode 100644 net/filters.h
> 
> diff --git a/net/Makefile.objs b/net/Makefile.objs
> index 914aec0..5fa2f97 100644
> --- a/net/Makefile.objs
> +++ b/net/Makefile.objs
> @@ -14,3 +14,4 @@ common-obj-$(CONFIG_SLIRP) += slirp.o
>  common-obj-$(CONFIG_VDE) += vde.o
>  common-obj-$(CONFIG_NETMAP) += netmap.o
>  common-obj-y += filter.o
> +common-obj-y += filter-buffer.o
> diff --git a/net/filter-buffer.c b/net/filter-buffer.c
> new file mode 100644
> index 0000000..7f2b050
> --- /dev/null
> +++ b/net/filter-buffer.c
> @@ -0,0 +1,160 @@
> +/*
> + * Copyright (c) 2015 FUJITSU LIMITED
> + * Author: Yang Hongyang <yan...@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include "net/filter.h"
> +#include "net/queue.h"
> +#include "filters.h"
> +#include "qemu-common.h"
> +#include "qemu/error-report.h"
> +#include "qemu/main-loop.h"
> +#include "qemu/timer.h"
> +#include "qemu/iov.h"
> +
> +typedef struct FILTERBUFFERState {
> +    NetFilterState nf;
> +    NetQueue *incoming_queue;
> +    NetQueue *inflight_queue;
> +    QEMUBH *flush_bh;
> +    int64_t interval;
> +    QEMUTimer release_timer;
> +} FILTERBUFFERState;
> +
> +static void packet_send_completed(NetClientState *nc, ssize_t len)
> +{
> +    return;
> +}
> +
> +static void filter_buffer_flush(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    NetQueue *queue = s->inflight_queue;
> +    NetPacket *packet;
> +
> +    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
> +        packet = QTAILQ_FIRST(&queue->packets);
> +        QTAILQ_REMOVE(&queue->packets, packet, entry);
> +        queue->nq_count--;
> +
> +        qemu_net_queue_send(packet->sender->peer->incoming_queue,
> +                            packet->sender,
> +                            packet->flags,
> +                            packet->data,
> +                            packet->size,
> +                            packet->sent_cb);
> +
> +        /*
> +         * now that we pass the packet to sender->peer->incoming_queue, we
> +         * don't care the reture value here, because the peer's queue will
> +         * take care of this packet
> +         */
> +        g_free(packet);
> +    }
> +
> +    g_free(queue);
> +    s->inflight_queue = NULL;
> +}
> +
> +static void filter_buffer_flush_bh(void *opaque)
> +{
> +    FILTERBUFFERState *s = opaque;
> +    NetFilterState *nf = &s->nf;
> +    filter_buffer_flush(nf);
> +}
> +
> +static void filter_buffer_release_one(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +
> +    /* flush inflight packets */
> +    if (s->inflight_queue) {
> +        filter_buffer_flush(nf);
> +    }
> +
> +    s->inflight_queue = s->incoming_queue;
> +    s->incoming_queue = qemu_new_net_queue(nf);
> +    qemu_bh_schedule(s->flush_bh);
> +}
> +
> +static void filter_buffer_release_timer(void *opaque)
> +{
> +    FILTERBUFFERState *s = opaque;
> +    filter_buffer_release_one(&s->nf);
> +    timer_mod(&s->release_timer,
> +              qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
> +}
> +
> +/* filter APIs */
> +static ssize_t filter_buffer_receive_iov(NetFilterState *nf,
> +                                         NetClientState *sender,
> +                                         unsigned flags,
> +                                         const struct iovec *iov,
> +                                         int iovcnt)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    NetQueue *queue = s->incoming_queue;
> +
> +    qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt,
> +                              packet_send_completed);
> +    return iov_size(iov, iovcnt);
> +}
> +
> +static void filter_buffer_cleanup(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +
> +    if (s->interval) {
> +        timer_del(&s->release_timer);
> +    }
> +
> +    /* flush inflight packets */
> +    filter_buffer_flush(nf);
> +    /* flush incoming packets */
> +    s->inflight_queue = s->incoming_queue;
> +    s->incoming_queue = NULL;
> +    filter_buffer_flush(nf);
> +
> +    if (s->flush_bh) {
> +        qemu_bh_delete(s->flush_bh);
> +        s->flush_bh = NULL;
> +    }
> +    return;
> +}
> +
> +
> +static NetFilterInfo net_filter_buffer_info = {
> +    .type = NET_FILTER_OPTIONS_KIND_BUFFER,
> +    .size = sizeof(FILTERBUFFERState),
> +    .receive_iov = filter_buffer_receive_iov,
> +    .cleanup = filter_buffer_cleanup,
> +};
> +
> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
> +                           int chain, NetClientState *netdev, Error **errp)
> +{
> +    NetFilterState *nf;
> +    FILTERBUFFERState *s;
> +    const NetFilterBufferOptions *bufferopt;
> +
> +    assert(opts->kind == NET_FILTER_OPTIONS_KIND_BUFFER);
> +    bufferopt = opts->buffer;
> +
> +    nf = qemu_new_net_filter(&net_filter_buffer_info,
> +                             netdev, "buffer", name, chain);
> +    s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    s->incoming_queue = qemu_new_net_queue(nf);
> +    s->flush_bh = qemu_bh_new(filter_buffer_flush_bh, s);
> +    s->interval = bufferopt->has_interval ? bufferopt->interval : 0;
> +    if (s->interval) {
> +        timer_init_us(&s->release_timer, QEMU_CLOCK_VIRTUAL,
> +                      filter_buffer_release_timer, s);
> +        timer_mod(&s->release_timer,
> +                  qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
> +    }
> +
> +    return 0;
> +}
> diff --git a/net/filter.c b/net/filter.c
> index 1ae9344..79351f3 100644
> --- a/net/filter.c
> +++ b/net/filter.c
> @@ -17,6 +17,7 @@
>  
>  #include "net/filter.h"
>  #include "net/net.h"
> +#include "filters.h"
>  
>  static QTAILQ_HEAD(, NetFilterState) net_filters;
>  
> @@ -137,6 +138,7 @@ typedef int (NetFilterInit)(const NetFilterOptions *opts,
>  
>  static
>  NetFilterInit * const net_filter_init_fun[NET_FILTER_OPTIONS_KIND_MAX] = {
> +    [NET_FILTER_OPTIONS_KIND_BUFFER] = net_init_filter_buffer,
>  };
>  
>  static int net_filter_init1(const NetFilter *netfilter, Error **errp)
> diff --git a/net/filters.h b/net/filters.h
> new file mode 100644
> index 0000000..3b546db
> --- /dev/null
> +++ b/net/filters.h
> @@ -0,0 +1,17 @@
> +/*
> + * Copyright (c) 2015 FUJITSU LIMITED
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_NET_FILTERS_H
> +#define QEMU_NET_FILTERS_H
> +
> +#include "net/net.h"
> +#include "net/filter.h"
> +
> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
> +                           int chain, NetClientState *netdev, Error **errp);
> +
> +#endif /* QEMU_NET_FILTERS_H */
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 9d97c21..e51bb59 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -2584,6 +2584,21 @@
>  { 'command': 'netfilter_del', 'data': {'id': 'str'} }
>  
>  ##
> +# @NetFilterBufferOptions
> +#
> +# a netbuffer filter for network backend.
> +#
> +# @interval: #optional release packets by interval, if no interval supplied,
> +#            will release packets when filter_buffer_release_all been called.
> +#            scale: microsecond
> +#
> +# Since 2.5
> +##
> +{ 'struct': 'NetFilterBufferOptions',
> +  'data': {
> +    '*interval':     'int64' } }
> +
> +##
>  # @NetFilterOptions
>  #
>  # A discriminated record of network filters.
> @@ -2592,7 +2607,8 @@
>  #
>  ##
>  { 'union': 'NetFilterOptions',
> -  'data': { } }
> +  'data': {
> +    'buffer':     'NetFilterBufferOptions'} }
>  
>  ##
>  # @NetFilter
> -- 
> 1.9.1
> 
> 
--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK

Reply via email to