On 2017年06月08日 17:16, wang.yong...@zte.com.cn wrote:

>> From: Wang Yong <wang.yong...@zte.com.cn>

>>

>> Process pactkets in the IOThread which arrived over the socket.

>> we use qio_channel_set_aio_fd_handler to set the handlers on the

>> IOThread AioContext.then the packets from the primary and the secondary

>> are processed in the IOThread.

>> Finally remove the colo-compare thread using the IOThread instead.

>>

>> Signed-off-by: Wang Yong<wang.yong...@zte.com.cn>

>> Signed-off-by: Wang Guang<wang.guan...@zte.com.cn>

>> ---

>> net/colo-compare.c | 133 ++++++++++++++++++++++++++++++++++++-----------------

>>   net/colo.h         |   1 +

>>   2 files changed, 91 insertions(+), 43 deletions(-)

>>

>> diff --git a/net/colo-compare.c b/net/colo-compare.c

>> index b0942a4..e3af791 100644

>> --- a/net/colo-compare.c

>> +++ b/net/colo-compare.c

>> @@ -29,6 +29,7 @@

>>   #include "qemu/sockets.h"

>>   #include "qapi-visit.h"

>>   #include "net/colo.h"

>> +#include "io/channel.h"

>>   #include "sysemu/iothread.h"

>>

>>   #define TYPE_COLO_COMPARE "colo-compare"

>> @@ -82,11 +83,6 @@ typedef struct CompareState {

>>       GQueue conn_list;

>>       /* hashtable to save connection */

>>       GHashTable *connection_track_table;

>> -    /* compare thread, a thread for each NIC */

>> -    QemuThread thread;

>> -

>> -    GMainContext *worker_context;

>> -    GMainLoop *compare_loop;

>>

>>       /*compare iothread*/

>>       IOThread *iothread;

>> @@ -95,6 +91,14 @@ typedef struct CompareState {

>>       QEMUTimer *packet_check_timer;

>>   } CompareState;

>>

>> +typedef struct {

>> +    Chardev parent;

>> +    QIOChannel *ioc; /*I/O channel */


>We probably don't want to manipulate char backend's internal io channel.

>All need here is to access the frontend API (char-fe.c) I believe, and

>hide the internal implementation.

char-fd.c ?


Char-fe.c for sure which means frontend of chardev.

These API can only watch events in the qemu main thread, not in the IOThread.

I had to use the qio_channel_socket_set_aio_fd_handler function to

monitor the char event in the IOThread,so the io channel is used her


The point is not touching the internal structure of chardev like ioc, instead extend its helper like e.g qemu_chr_fe_set_handlers() and let it set aio handlers,

->qio_channel_socket_set_aio_fd_handler

   ->aio_set_fd_handler


Thanks


>> +} CompareChardev;

>> +

>> +#define COMPARE_CHARDEV(obj)         \

>> +    OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)

>> +

>>   typedef struct CompareClass {

>>       ObjectClass parent_class;

>>   } CompareClass;

>> @@ -107,6 +111,12 @@ enum {

>>   static int compare_chr_send(CharBackend *out,

>>                               const uint8_t *buf,

>>                               uint32_t size);

>> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,

>> +                                    AioContext *ctx,

>> +                                    IOCanReadHandler *fd_can_read,

>> +                                    IOReadHandler *fd_read,

>> +                                    IOEventHandler *fd_event,

>> +                                    void *opaque);

>>

>>   static gint seq_sorter(Packet *a, Packet *b, gpointer data)

>>   {

>> @@ -534,6 +544,30 @@ err:

>>       return ret < 0 ? ret : -EIO;

>>   }

>>

>> +static void compare_chr_read(void *opaque)

>> +{

>> +    Chardev *chr = opaque;

>> +    uint8_t buf[CHR_READ_BUF_LEN];

>> +    int len, size;

>> +    int max_size;

>> +

>> +    max_size = qemu_chr_be_can_write(chr);

>> +    if (max_size <= 0) {

>> +        return;

>> +    }

>> +

>> +    len = sizeof(buf);

>> +    if (len > max_size) {

>> +        len = max_size;

>> +    }

>> + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len);

>> +    if (size == 0) {

>> +        return;

>> +    } else if (size > 0) {

>> +        qemu_chr_be_write(chr, buf, size);

>> +    }

>> +}

>> +

>>   static int compare_chr_can_read(void *opaque)

>>   {

>>       return COMPARE_READ_LEN_MAX;

>> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)

>>

>>       ret = net_fill_rstate(&s->pri_rs, buf, size);

>>       if (ret == -1) {

>> -        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,

>> -                                 NULL, NULL, true);

>> +  compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>> +                                    NULL, NULL, NULL, NULL);

>>           error_report("colo-compare primary_in error");

>>       }

>>   }

>> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)

>>

>>       ret = net_fill_rstate(&s->sec_rs, buf, size);

>>       if (ret == -1) {

>> -        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,

>> -                                 NULL, NULL, true);

>> +  compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>> +                                    NULL, NULL, NULL, NULL);

>>           error_report("colo-compare secondary_in error");

>>       }

>>   }

>> @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s)

>>       }

>>   }

>>

>> -static void *colo_compare_thread(void *opaque)

>> -{

>> -    CompareState *s = opaque;

>> -

>> -    s->worker_context = g_main_context_new();

>> -

>> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,

>> - compare_pri_chr_in, NULL, s, s->worker_context, true);

>> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,

>> - compare_sec_chr_in, NULL, s, s->worker_context, true);

>> -

>> -    s->compare_loop = g_main_loop_new(s->worker_context, FALSE);

>> -

>> -    g_main_loop_run(s->compare_loop);

>> -

>> -    g_main_loop_unref(s->compare_loop);

>> -    g_main_context_unref(s->worker_context);

>> -    return NULL;

>> -}

>>

>>   static void colo_compare_iothread(CompareState *s)

>>   {

>>       object_ref(OBJECT(s->iothread));

>>       s->ctx = iothread_get_aio_context(s->iothread);

>>

>> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>> +                    compare_chr_can_read,

>> +                    compare_pri_chr_in,

>> +                    NULL,

>> +                    s);

>> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>> +                    compare_chr_can_read,

>> +                    compare_sec_chr_in,

>> +                    NULL,

>> +                    s);

>> +

>>       colo_compare_timer_init(s);

>>   }

>>

>> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,

>> +                                    AioContext *ctx,

>> +                                    IOCanReadHandler *fd_can_read,

>> +                                    IOReadHandler *fd_read,

>> +                                    IOEventHandler *fd_event,

>> +                                    void *opaque)

>> +{

>> +    CompareChardev *s;

>> +

>> +    if (!b->chr) {

>> +        return;

>> +    }

>> +    s = COMPARE_CHARDEV(b->chr);

>> +    if (!s->ioc) {

>> +        return;

>> +    }


>So this is hacky, you can refer how vhost-user validate udp socket char

>backend.

I will investigate.


Thanks


>> +

>> +    b->chr_can_read = fd_can_read;

>> +    b->chr_read = fd_read;

>> +    b->chr_event = fd_event;

>> +    b->opaque = opaque;

>> +    remove_fd_in_watch(b->chr);

>> +

>> +    if (b->chr_read) {

>> +        qio_channel_set_aio_fd_handler(s->ioc, ctx,

>> +                                compare_chr_read, NULL, b->chr);

>> +    } else {

>> + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL);


>So instead of doing such hack, how about passing a AioContext * instead

>of GMainContext * to qemu_chr_fe_set_handlers?

IOThread AioContext ->GSource -> GMainContext  is NULL

if we still use the qemu_chr_fe_set_handlers, it will use the qemu main thread' GMainContext,

then io will still be processed in the qemu main thread.

so I encapsulate a function(compare_chr_set_aio_fd_handlers) to monitor char fd in the IOThread.



As above, we should do this inside qemu-fe.c not here.

Thanks

Thanks


>Thanks


>> +    }

>> +}

>> +

>>   static char *compare_get_pri_indev(Object *obj, Error **errp)

>>   {

>>       CompareState *s = COLO_COMPARE(obj);

>> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)

>>   {

>>       CompareState *s = COLO_COMPARE(uc);

>>       Chardev *chr;

>> -    char thread_name[64];

>> -    static int compare_id;

>>

>> if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {

>>           error_setg(errp, "colo compare needs 'primary_in' ,"

>> @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)

>> g_free,

>> connection_destroy);

>>

>> -    sprintf(thread_name, "colo-compare %d", compare_id);

>> -    qemu_thread_create(&s->thread, thread_name,

>> -                       colo_compare_thread, s,

>> -                       QEMU_THREAD_JOINABLE);

>> -    compare_id++;

>> -

>>       colo_compare_iothread(s);

>>

>>       return;

>> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)

>>   {

>>       CompareState *s = COLO_COMPARE(obj);

>>

>> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,

>> -                             s->worker_context, true);

>> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,

>> -                             s->worker_context, true);

>> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,

>> +                                    NULL, NULL, NULL, NULL);

>> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,

>> +                                    NULL, NULL, NULL, NULL);

>> +

>>       qemu_chr_fe_deinit(&s->chr_out);

>>       colo_compare_timer_del(s);

>>

>> -    g_main_loop_quit(s->compare_loop);

>> -    qemu_thread_join(&s->thread);

>> -

>>       /* Release all unhandled packets after compare thead exited */

>>       g_queue_foreach(&s->conn_list, colo_flush_packets, s);

>>

>> diff --git a/net/colo.h b/net/colo.h

>> index 7c524f3..936dea1 100644

>> --- a/net/colo.h

>> +++ b/net/colo.h

>> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable *connection_track_table,

>> void connection_hashtable_reset(GHashTable *connection_track_table);

>>   Packet *packet_new(const void *data, int size);

>>   void packet_destroy(void *opaque, void *user_data);

>> +void remove_fd_in_watch(Chardev *chr);

>>

>>   #endif /* QEMU_COLO_PROXY_H */





原始邮件
*发件人:*<jasow...@redhat.com>;
*收件人:*王勇10170530;<zhang.zhanghaili...@huawei.com>;<zhangchen.f...@cn.fujitsu.com>;
*抄送人:*<lizhij...@cn.fujitsu.com>;<qemu-devel@nongnu.org>;王广10165992;
*日 期 :*2017年06月07日 16:35
*主 题 :**Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary*




On 2017年06月05日 18:44, Yong Wang wrote:
> From: Wang Yong <wang.yong...@zte.com.cn>
>
> Process pactkets in the IOThread which arrived over the socket.
> we use qio_channel_set_aio_fd_handler to set the handlers on the
> IOThread AioContext.then the packets from the primary and the secondary
> are processed in the IOThread.
> Finally remove the colo-compare thread using the IOThread instead.
>
> Signed-off-by: Wang Yong<wang.yong...@zte.com.cn>
> Signed-off-by: Wang Guang<wang.guan...@zte.com.cn>
> ---
>   net/colo-compare.c | 133 
++++++++++++++++++++++++++++++++++++-----------------
>   net/colo.h         |   1 +
>   2 files changed, 91 insertions(+), 43 deletions(-)
>
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index b0942a4..e3af791 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -29,6 +29,7 @@
>   #include "qemu/sockets.h"
>   #include "qapi-visit.h"
>   #include "net/colo.h"
> +#include "io/channel.h"
>   #include "sysemu/iothread.h"
>
>   #define TYPE_COLO_COMPARE "colo-compare"
> @@ -82,11 +83,6 @@ typedef struct CompareState {
>       GQueue conn_list;
>       /* hashtable to save connection */
>       GHashTable *connection_track_table;
> -    /* compare thread, a thread for each NIC */
> -    QemuThread thread;
> -
> -    GMainContext *worker_context;
> -    GMainLoop *compare_loop;
>
>       /*compare iothread*/
>       IOThread *iothread;
> @@ -95,6 +91,14 @@ typedef struct CompareState {
>       QEMUTimer *packet_check_timer;
>   } CompareState;
>
> +typedef struct {
> +    Chardev parent;
> +    QIOChannel *ioc; /*I/O channel */

We probably don't want to manipulate char backend's internal io channel.
All need here is to access the frontend API (char-fe.c) I believe, and
hide the internal implementation.

> +} CompareChardev;
> +
> +#define COMPARE_CHARDEV(obj)                                     \
> +    OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET)
> +
>   typedef struct CompareClass {
>       ObjectClass parent_class;
>   } CompareClass;
> @@ -107,6 +111,12 @@ enum {
>   static int compare_chr_send(CharBackend *out,
>                               const uint8_t *buf,
>                               uint32_t size);
> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> +                                    AioContext *ctx,
> +                                    IOCanReadHandler *fd_can_read,
> +                                    IOReadHandler *fd_read,
> +                                    IOEventHandler *fd_event,
> +                                    void *opaque);
>
>   static gint seq_sorter(Packet *a, Packet *b, gpointer data)
>   {
> @@ -534,6 +544,30 @@ err:
>       return ret < 0 ? ret : -EIO;
>   }
>
> +static void compare_chr_read(void *opaque)
> +{
> +    Chardev *chr = opaque;
> +    uint8_t buf[CHR_READ_BUF_LEN];
> +    int len, size;
> +    int max_size;
> +
> +    max_size = qemu_chr_be_can_write(chr);
> +    if (max_size <= 0) {
> +        return;
> +    }
> +
> +    len = sizeof(buf);
> +    if (len > max_size) {
> +        len = max_size;
> +    }
> +    size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len);
> +    if (size == 0) {
> +        return;
> +    } else if (size > 0) {
> +        qemu_chr_be_write(chr, buf, size);
> +    }
> +}
> +
>   static int compare_chr_can_read(void *opaque)
>   {
>       return COMPARE_READ_LEN_MAX;
> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const 
uint8_t *buf, int size)
>
>       ret = net_fill_rstate(&s->pri_rs, buf, size);
>       if (ret == -1) {
> -        qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL,
> -                                 NULL, NULL, true);
> +        compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> +                                    NULL, NULL, NULL, NULL);
>           error_report("colo-compare primary_in error");
>       }
>   }
> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const 
uint8_t *buf, int size)
>
>       ret = net_fill_rstate(&s->sec_rs, buf, size);
>       if (ret == -1) {
> -        qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL,
> -                                 NULL, NULL, true);
> +        compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> +                                    NULL, NULL, NULL, NULL);
>           error_report("colo-compare secondary_in error");
>       }
>   }
> @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s)
>       }
>   }
>
> -static void *colo_compare_thread(void *opaque)
> -{
> -    CompareState *s = opaque;
> -
> -    s->worker_context = g_main_context_new();
> -
> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
> -                          compare_pri_chr_in, NULL, s, s->worker_context, 
true);
> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
> -                          compare_sec_chr_in, NULL, s, s->worker_context, 
true);
> -
> -    s->compare_loop = g_main_loop_new(s->worker_context, FALSE);
> -
> -    g_main_loop_run(s->compare_loop);
> -
> -    g_main_loop_unref(s->compare_loop);
> -    g_main_context_unref(s->worker_context);
> -    return NULL;
> -}
>
>   static void colo_compare_iothread(CompareState *s)
>   {
>       object_ref(OBJECT(s->iothread));
>       s->ctx = iothread_get_aio_context(s->iothread);
>
> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> +                    compare_chr_can_read,
> +                    compare_pri_chr_in,
> +                    NULL,
> +                    s);
> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> +                    compare_chr_can_read,
> +                    compare_sec_chr_in,
> +                    NULL,
> +                    s);
> +
>       colo_compare_timer_init(s);
>   }
>
> +static void compare_chr_set_aio_fd_handlers(CharBackend *b,
> +                                    AioContext *ctx,
> +                                    IOCanReadHandler *fd_can_read,
> +                                    IOReadHandler *fd_read,
> +                                    IOEventHandler *fd_event,
> +                                    void *opaque)
> +{
> +    CompareChardev *s;
> +
> +    if (!b->chr) {
> +        return;
> +    }
> +    s = COMPARE_CHARDEV(b->chr);
> +    if (!s->ioc) {
> +        return;
> +    }

So this is hacky, you can refer how vhost-user validate udp socket char
backend.

> +
> +    b->chr_can_read = fd_can_read;
> +    b->chr_read = fd_read;
> +    b->chr_event = fd_event;
> +    b->opaque = opaque;
> +    remove_fd_in_watch(b->chr);
> +
> +    if (b->chr_read) {
> +        qio_channel_set_aio_fd_handler(s->ioc, ctx,
> +                                compare_chr_read, NULL, b->chr);
> +    } else {
> +        qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL);

So instead of doing such hack, how about passing a AioContext * instead
of GMainContext * to qemu_chr_fe_set_handlers?

Thanks

> +    }
> +}
> +
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc, 
Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(uc);
>       Chardev *chr;
> -    char thread_name[64];
> -    static int compare_id;
>
>       if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
>           error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc, 
Error **errp)
>                                                         g_free,
>                                                         connection_destroy);
>
> -    sprintf(thread_name, "colo-compare %d", compare_id);
> -    qemu_thread_create(&s->thread, thread_name,
> -                       colo_compare_thread, s,
> -                       QEMU_THREAD_JOINABLE);
> -    compare_id++;
> -
>       colo_compare_iothread(s);
>
>       return;
> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj)
>   {
>       CompareState *s = COLO_COMPARE(obj);
>
> -    qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
> -                             s->worker_context, true);
> -    qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
> -                             s->worker_context, true);
> +    compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx,
> +                                    NULL, NULL, NULL, NULL);
> +    compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx,
> +                                    NULL, NULL, NULL, NULL);
> +
>       qemu_chr_fe_deinit(&s->chr_out);
>       colo_compare_timer_del(s);
>
> -    g_main_loop_quit(s->compare_loop);
> -    qemu_thread_join(&s->thread);
> -
>       /* Release all unhandled packets after compare thead exited */
>       g_queue_foreach(&s->conn_list, colo_flush_packets, s);
>
> diff --git a/net/colo.h b/net/colo.h
> index 7c524f3..936dea1 100644
> --- a/net/colo.h
> +++ b/net/colo.h
> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable 
*connection_track_table,
>   void connection_hashtable_reset(GHashTable *connection_track_table);
>   Packet *packet_new(const void *data, int size);
>   void packet_destroy(void *opaque, void *user_data);
> +void remove_fd_in_watch(Chardev *chr);
>
>   #endif /* QEMU_COLO_PROXY_H */





Reply via email to