Peter Xu <[email protected]> writes:

> On Fri, Dec 26, 2025 at 06:19:16PM -0300, Fabiano Rosas wrote:
>> Make channel.c deal only with QIOChannel objects. Move any handling of
>> QEMUFile into migration.c. To achieve this in a clean way:
>> 
>> 1) Define a migration_outgoing_setup, analogous to
>> migration_incoming_setup, responsible for creating the QEMUFile from
>> the QIOChannel.
>> 
>> 2) Increase the scope of migration_incoming_setup to create not only
>> the main channel, but all the others as well. That is currently being
>> done at migration_ioc_process, so move the code.
>> 
>> 3) Adjust RDMA code to pass in the QIOChannel and remove some of the
>> usage of QEMUFile.
>> 
>> Signed-off-by: Fabiano Rosas <[email protected]>
>> ---
>>  migration/channel.c   | 21 ++++++-----
>>  migration/migration.c | 88 ++++++++++++++++++++++---------------------
>>  migration/migration.h |  6 ++-
>>  migration/multifd.c   |  7 ++--
>>  migration/multifd.h   |  2 +-
>>  migration/rdma.c      | 28 ++++----------
>>  6 files changed, 73 insertions(+), 79 deletions(-)
>> 
>> diff --git a/migration/channel.c b/migration/channel.c
>> index 7243b99108..af6c2cc76e 100644
>> --- a/migration/channel.c
>> +++ b/migration/channel.c
>> @@ -14,7 +14,6 @@
>>  #include "channel.h"
>>  #include "tls.h"
>>  #include "migration.h"
>> -#include "qemu-file.h"
>>  #include "trace.h"
>>  #include "qapi/error.h"
>>  #include "io/channel-tls.h"
>> @@ -34,6 +33,7 @@ void migration_channel_process_incoming(QIOChannel *ioc)
>>  {
>>      MigrationIncomingState *mis = migration_incoming_get_current();
>>      Error *local_err = NULL;
>> +    uint8_t ch;
>>  
>>      trace_migration_set_incoming_channel(
>>          ioc, object_get_typename(OBJECT(ioc)));
>> @@ -42,9 +42,16 @@ void migration_channel_process_incoming(QIOChannel *ioc)
>>          migration_tls_channel_process_incoming(ioc, &local_err);
>>      } else {
>>          migration_ioc_register_yank(ioc);
>> -        migration_ioc_process_incoming(ioc, &local_err);
>> -    }
>> +        ch = migration_ioc_process_incoming(ioc, &local_err);
>> +        if (!ch) {
>> +            goto out;
>> +        }
>>  
>> +        if (migration_incoming_setup(ioc, ch, &local_err)) {
>> +            migration_incoming_process();
>> +        }
>> +    }
>> +out:
>>      if (local_err) {
>>          error_report_err(local_err);
>>          migrate_set_state(&mis->state, mis->state, MIGRATION_STATUS_FAILED);
>> @@ -75,14 +82,8 @@ void migration_channel_connect(MigrationState *s, 
>> QIOChannel *ioc)
>>          return;
>>      }
>>  
>> -    QEMUFile *f = qemu_file_new_output(ioc);
>> -
>>      migration_ioc_register_yank(ioc);
>> -
>> -    qemu_mutex_lock(&s->qemu_file_lock);
>> -    s->to_dst_file = f;
>> -    qemu_mutex_unlock(&s->qemu_file_lock);
>> -
>> +    migration_outgoing_setup(ioc);
>>      migration_connect(s);
>>  }
>>  
>> diff --git a/migration/migration.c b/migration/migration.c
>> index 5c6c76f110..677581b5a5 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -92,7 +92,7 @@ enum mig_rp_message_type {
>>  };
>>  
>>  /* Migration channel types */
>> -enum { CH_MAIN, CH_MULTIFD, CH_POSTCOPY };
>> +enum { CH_NONE, CH_MAIN, CH_MULTIFD, CH_POSTCOPY };
>>  
>>  /* When we add fault tolerance, we could have several
>>     migrations at once.  For now we don't need to add
>> @@ -934,17 +934,48 @@ out:
>>      migrate_incoming_unref_outgoing_state();
>>  }
>>  
>> -/**
>> - * migration_incoming_setup: Setup incoming migration
>> - * @f: file for main migration channel
>> - */
>> -static void migration_incoming_setup(QEMUFile *f)
>> +static bool migration_has_main_and_multifd_channels(void);
>> +
>> +bool migration_incoming_setup(QIOChannel *ioc, uint8_t channel, Error 
>> **errp)
>>  {
>>      MigrationIncomingState *mis = migration_incoming_get_current();
>> +    QEMUFile *f;
>>  
>> -    assert(!mis->from_src_file);
>> -    mis->from_src_file = f;
>> -    qemu_file_set_blocking(f, false, &error_abort);
>> +    switch (channel) {
>> +    case CH_MAIN:
>> +        f = qemu_file_new_input(ioc);
>> +        assert(!mis->from_src_file);
>> +        mis->from_src_file = f;
>> +        qemu_file_set_blocking(f, false, &error_abort);
>> +        break;
>> +
>> +    case CH_MULTIFD:
>> +        if (!multifd_recv_new_channel(ioc, errp)) {
>> +            return false;
>> +        }
>> +        break;
>> +
>> +    case CH_POSTCOPY:
>> +        assert(!mis->postcopy_qemufile_dst);
>> +        f = qemu_file_new_input(ioc);
>> +        postcopy_preempt_new_channel(mis, f);
>> +        return false;
>> +
>> +    default:
>> +        g_assert_not_reached();
>> +    }
>> +
>> +    return migration_has_main_and_multifd_channels();
>> +}
>> +
>> +void migration_outgoing_setup(QIOChannel *ioc)
>> +{
>> +    MigrationState *s = migrate_get_current();
>> +    QEMUFile *f = qemu_file_new_output(ioc);
>> +
>> +    qemu_mutex_lock(&s->qemu_file_lock);
>> +    s->to_dst_file = f;
>> +    qemu_mutex_unlock(&s->qemu_file_lock);
>>  }
>>  
>>  /* Returns true if recovered from a paused migration, otherwise false */
>> @@ -990,12 +1021,6 @@ void migration_incoming_process(void)
>>      qemu_coroutine_enter(co);
>>  }
>>  
>> -void migration_fd_process_incoming(QEMUFile *f)
>> -{
>> -    migration_incoming_setup(f);
>> -    migration_incoming_process();
>> -}
>> -
>>  static bool migration_has_main_and_multifd_channels(void)
>>  {
>>      MigrationIncomingState *mis = migration_incoming_get_current();
>> @@ -1012,12 +1037,10 @@ static bool 
>> migration_has_main_and_multifd_channels(void)
>>      return true;
>>  }
>>  
>> -void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
>> +uint8_t migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
>>  {
>>      MigrationIncomingState *mis = migration_incoming_get_current();
>> -    Error *local_err = NULL;
>> -    QEMUFile *f;
>> -    uint8_t channel;
>> +    uint8_t channel = CH_NONE;
>>      uint32_t channel_magic = 0;
>>      int ret = 0;
>>  
>> @@ -1036,7 +1059,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, 
>> Error **errp)
>>              ret = migration_channel_read_peek(ioc, (void *)&channel_magic,
>>                                                sizeof(channel_magic), errp);
>>              if (ret != 0) {
>> -                return;
>> +                goto out;
>>              }
>>  
>>              channel_magic = be32_to_cpu(channel_magic);
>> @@ -1051,7 +1074,6 @@ void migration_ioc_process_incoming(QIOChannel *ioc, 
>> Error **errp)
>>                  channel = CH_MAIN;
>>              } else {
>>                  error_setg(errp, "unknown channel magic: %u", 
>> channel_magic);
>> -                return;
>>              }
>>          } else if (mis->from_src_file && migrate_multifd()) {
>>              /*
>> @@ -1063,33 +1085,13 @@ void migration_ioc_process_incoming(QIOChannel *ioc, 
>> Error **errp)
>>              channel = CH_MAIN;
>>          } else {
>>              error_setg(errp, "non-peekable channel used without multifd");
>> -            return;
>>          }
>>      } else {
>>          assert(migrate_postcopy_preempt());
>>          channel = CH_POSTCOPY;
>>      }
>> -
>> -    if (channel == CH_MAIN) {
>> -        f = qemu_file_new_input(ioc);
>> -        migration_incoming_setup(f);
>> -    } else if (channel == CH_MULTIFD) {
>> -        /* Multiple connections */
>> -        multifd_recv_new_channel(ioc, &local_err);
>> -        if (local_err) {
>> -            error_propagate(errp, local_err);
>> -            return;
>> -        }
>> -    } else if (channel == CH_POSTCOPY) {
>> -        assert(!mis->postcopy_qemufile_dst);
>> -        f = qemu_file_new_input(ioc);
>> -        postcopy_preempt_new_channel(mis, f);
>> -        return;
>> -    }
>> -
>> -    if (migration_has_main_and_multifd_channels()) {
>> -        migration_incoming_process();
>> -    }
>> +out:
>> +    return channel;
>>  }
>>  
>>  /**
>> diff --git a/migration/migration.h b/migration/migration.h
>> index f340cd518d..d2b82cf54f 100644
>> --- a/migration/migration.h
>> +++ b/migration/migration.h
>> @@ -526,8 +526,10 @@ struct MigrationState {
>>  void migrate_set_state(MigrationStatus *state, MigrationStatus old_state,
>>                         MigrationStatus new_state);
>>  
>> -void migration_fd_process_incoming(QEMUFile *f);
>> -void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp);
>> +void migration_outgoing_setup(QIOChannel *ioc);
>> +bool migration_incoming_setup(QIOChannel *ioc, uint8_t channel, Error 
>> **errp);
>> +
>> +uint8_t migration_ioc_process_incoming(QIOChannel *ioc, Error **errp);
>>  void migration_incoming_process(void);
>>  
>>  bool  migration_has_all_channels(void);
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 3fb1a07ba9..c6639dbab5 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -1521,7 +1521,7 @@ bool multifd_recv_all_channels_created(void)
>>   * Try to receive all multifd channels to get ready for the migration.
>>   * Sets @errp when failing to receive the current channel.
>>   */
>> -void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>> +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>>  {
>>      MultiFDRecvParams *p;
>>      Error *local_err = NULL;
>> @@ -1536,7 +1536,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
>> **errp)
>>                                      "failed to receive packet"
>>                                      " via multifd channel %d: ",
>>                                      
>> qatomic_read(&multifd_recv_state->count));
>> -            return;
>> +            return false;
>>          }
>>          trace_multifd_recv_new_channel(id);
>>      } else {
>> @@ -1549,7 +1549,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
>> **errp)
>>                     id);
>>          multifd_recv_terminate_threads(error_copy(local_err));
>>          error_propagate(errp, local_err);
>> -        return;
>> +        return false;
>>      }
>>      p->c = ioc;
>>      object_ref(OBJECT(ioc));
>> @@ -1558,4 +1558,5 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error 
>> **errp)
>>      qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
>>                         QEMU_THREAD_JOINABLE);
>>      qatomic_inc(&multifd_recv_state->count);
>> +    return true;
>>  }
>> diff --git a/migration/multifd.h b/migration/multifd.h
>> index 9b6d81e7ed..89a395aef2 100644
>> --- a/migration/multifd.h
>> +++ b/migration/multifd.h
>> @@ -42,7 +42,7 @@ int multifd_recv_setup(Error **errp);
>>  void multifd_recv_cleanup(void);
>>  void multifd_recv_shutdown(void);
>>  bool multifd_recv_all_channels_created(void);
>> -void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>> +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>>  void multifd_recv_sync_main(void);
>>  int multifd_send_sync_main(MultiFDSyncReq req);
>>  bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index 596a1aba0b..7bee871e2b 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -384,7 +384,6 @@ struct QIOChannelRDMA {
>>      QIOChannel parent;
>>      RDMAContext *rdmain;
>>      RDMAContext *rdmaout;
>> -    QEMUFile *file;
>>      bool blocking; /* XXX we don't actually honour this yet */
>>  };
>>  
>> @@ -3836,32 +3835,20 @@ static void qio_channel_rdma_register_types(void)
>>  
>>  type_init(qio_channel_rdma_register_types);
>>  
>> -static QEMUFile *rdma_new_input(RDMAContext *rdma)
>> +static QIOChannel *rdma_new_ioc(RDMAContext *rdma)
>>  {
>>      QIOChannelRDMA *rioc = 
>> QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>>  
>> -    rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
>> -    rioc->rdmain = rdma;
>> -    rioc->rdmaout = rdma->return_path;
>> -
>> -    return rioc->file;
>> -}
>> -
>> -static QEMUFile *rdma_new_output(RDMAContext *rdma)
>> -{
>> -    QIOChannelRDMA *rioc = 
>> QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>> -
>> -    rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
>>      rioc->rdmaout = rdma;
>>      rioc->rdmain = rdma->return_path;
>
> Likely it was overlooked rdmaout/rdmain was set in reverse order in these
> two functions.  I gave it a quick run on rdma and it was indeed broken
> starting from this patch.
>

I'll remember to test rdma next time, thanks for catching this.

> The goal of the change looks reasonable in general otherwise, said that,
> maybe there's way to split the patch somehow?
>

Yes, no problem.

Reply via email to