I modified my "OutcomeImportStartedEvent" consumer to reset the state
and completed status. This causes everything to work correctly. It
doesn't feel right, but it works.
public void Consume(OutcomeImportStartedEvent message)
{
State = new OutcomeImportState {OutcomesToImport =
message.OutcomesToImport, OutcomesImported = 0 };
IsCompleted = false;
}
On May 6, 6:43 pm, RyanL <[email protected]> wrote:
> I set threadcount to 1. The behavior is the same.
> When the first message is fired (OutcomeImportStartedEvent) a saga is
> created, as expected. Debugging into Rhino ESB it hits this part of
> the code (in DefaultServiceBus), which verifies to me that it is
> detecting the event as a saga starting event (InitiatedBy). My
> consumer is in the "consumers" collection (nothing in
> "instanceConsumers" or "sagas" collections).
>
> saga.Id = sagaMessage != null ?
> sagaMessage.CorrelationId :
> GuidCombGenerator.Generate();
>
> return instanceConsumers
> .Union(sagas)
> .Union(consumers.Where(x => x != null))
> .ToArray();
>
> My consumer is instantiated which news up the state and everything
> following runs as expected. It consumes all of the Orchestrates
> messages and IsCompleted is set to true at the correct time. The saga
> runs correctly from start to finish the first time through. I then
> try to run a new saga, started once again by
> OutcomeImportStartedEvent. The problem I see now however is that the
> consumer and state are not "newed up", instead my consumer picks up
> the saga starting event which has a state of IsCompleted = true, this
> causes my consumer to be released from the container and then there is
> nothing to consume my Orchestrates events. Do I need to reset the
> state in my initiatedby consume function rather than in the
> constructor?
>
> On May 6, 1:56 pm, Corey Kaylor <[email protected]> wrote:
>
>
>
>
>
>
>
> > Invoking a consumer does not require a subscription. It only requires that
> > there is a consumer registered in the container at the endpoint that
> > receives the message. With sagas you will get this error message if the
> > Orchestrates<TMessage> is received and the saga has already been completed,
> > OR the saga hasn't started yet. Based on your code sample it's possible
> > that you are receiving OutcomeImportedEvent before the saga is started. You
> > can possibly test if this is true by setting the threadCount to 1 and see
> > if you still have the same problem.
>
> > On Sun, May 6, 2012 at 12:28 PM, RyanL <[email protected]> wrote:
> > > When the consumer marks the saga as complete, the following code is
> > > called in DefaultServiceBus.cs (saga.IsCompleted is true).
>
> > > if (saga.IsCompleted)
> > > reflection.InvokeSagaPersisterComplete(persister,
> > > saga);
> > > else
> > > reflection.InvokeSagaPersisterSave(persister, saga);
>
> > > However, when I send another message to start a new saga (InitiatedBy)
> > > it is consumed by the same consumer and when I inspect the State it
> > > has not changed. After this first message is consumed, the consumer
> > > is released from the container by this area of code in
> > > DefaultServiceBus.cs:
>
> > > var initiatedBy =
> > > reflection.GetGenericTypeOf(typeof(InitiatedBy<>), msgType);
> > > if (initiatedBy.IsInstanceOfType(saga) == false)
> > > {
> > > serviceLocator.Release(consumers[i]);
> > > consumers[i] = null;
> > > continue;
> > > }
>
> > > It is at this point future messages throw the error that there are no
> > > consumers.
>
> > > On May 6, 8:58 am, RyanL <[email protected]> wrote:
> > > > I am creating a new saga, but I'm getting an error that there are no
> > > > consumers, so it appears that the subscription is lost after one saga
> > > > is completed. Here is my code that creates and publishes the saga
> > > > events:
>
> > > > var outcomes = retriever.GetFile(textReader).NewsItems;
> > > > var sagaId = Guid.NewGuid();
>
> > > > bus.Send(new OutcomeImportStartedEvent() { CorrelationId =
> > > > sagaId, OutcomesToImport = outcomes.Count() });
> > > > foreach (var o in outcomes)
> > > > {
> > > > var outcomeImportedEvent = new OutcomeImportedEvent()
> > > > { Outcome = o, CorrelationId = sagaId };
> > > > bus.Send(outcomeImportedEvent);
> > > > }
>
> > > > On May 6, 8:45 am, "Oren Eini (Ayende Rahien)" <[email protected]>
> > > > wrote:
>
> > > > > Ryan,
> > > > > Each customer create a NEW saga.
>
> > > > > On Sun, May 6, 2012 at 4:45 PM, RyanL <[email protected]> wrote:
> > > > > > Also how does making the consumer go away make sense? Using
> > > Starbucks
> > > > > > as an example, this would mean you can only handle taking a single
> > > > > > order. The next customer could not place an order.
>
> > > > > > On May 5, 12:27 pm, Ryan Langton <[email protected]> wrote:
> > > > > > > Ok, is there any way to implement this sort of behavior then?
> > > > > > > Users are importing entities. I want an action to occur on each
> > > entity
> > > > > > > import and another action to occur when the import is 100%
> > > complete. I
> > > > > > > assumed sagas would be the best way to handle this. But I need to
> > > allow
> > > > > > > users to perform multiple imports as well as possible simultaneous
> > > > > > imports
> > > > > > > (multiple users importing at the same time - edge case). The
> > > import
> > > > > > files
> > > > > > > can be quite large to where putting all of the entities in a
> > > > > > > single
> > > > > > message
> > > > > > > can easily exceed the limitations of msmq.
>
> > > > > > > Just looking for ideas. Thanks for the response.
>
> > > > > > > On Sat, May 5, 2012 at 6:40 AM, Oren Eini (Ayende Rahien) <
> > > > > > [email protected]
>
> > > > > > > > wrote:
> > > > > > > > Once the saga is completed, it is just that, completed, done.
> > > > > > > > This is the by design behavior.
>
> > > > > > > > On Fri, May 4, 2012 at 10:17 PM, RyanL <[email protected]>
> > > wrote:
>
> > > > > > > >> I have a saga. Once it completes the saga consumer disappears
> > > and
> > > > > > > >> future messages get an error "Got Message X, but had no
> > > consumers for
> > > > > > > >> it". What do I do to leave the consumer / subscription active
> > > and
> > > > > > > >> just start a new saga? Here is my class:
>
> > > > > > > >> public class OutcomeImportSagaLuceneIndexer :
> > > > > > > >> ISaga<OutcomeImportState>,
>
> > > > > > > >> InitiatedBy<OutcomeImportStartedEvent>,
>
> > > > > > > >> Orchestrates<OutcomeImportedEvent>
> > > > > > > >> {
> > > > > > > >> private readonly ILuceneRepository<OutcomeListModel>
> > > > > > > >> repository;
>
> > > > > > > >> public
>
> > > OutcomeImportSagaLuceneIndexer(ILuceneRepository<OutcomeListModel>
> > > > > > > >> repository)
> > > > > > > >> {
> > > > > > > >> this.repository = repository;
> > > > > > > >> State = new OutcomeImportState();
> > > > > > > >> }
>
> > > > > > > >> public void Consume(OutcomeImportStartedEvent message)
> > > > > > > >> {
> > > > > > > >> State.OutcomesToImport = message.OutcomesToImport;
> > > > > > > >> }
>
> > > > > > > >> public void Consume(OutcomeImportedEvent message)
> > > > > > > >> {
>
> > > repository.Upsert(MapToLuceneListRecord(message.Outcome));
> > > > > > > >> State.OutcomesImported++;
> > > > > > > >> if (State.OutcomesImported < State.OutcomesToImport)
> > > > > > > >> return;
>
> > > > > > > >> repository.RebuildIndex();
> > > > > > > >> IsCompleted = true;
> > > > > > > >> }
>
> > > > > > > >> private static OutcomeListModel
> > > > > > > >> MapToLuceneListRecord(OutcomeViewModel model)
> > > > > > > >> {
> > > > > > > >> return Mapper.Map<OutcomeViewModel,
> > > > > > > >> OutcomeListModel>(model);
> > > > > > > >> }
>
> > > > > > > >> public Guid Id { get; set; }
> > > > > > > >> public bool IsCompleted { get; set; }
> > > > > > > >> public OutcomeImportState State { get; set; }
> > > > > > > >> }
>
> > > > > > > >> --
> > > > > > > >> You received this message because you are subscribed to the
> > > Google
> > > > > > Groups
> > > > > > > >> "Rhino Tools Dev" group.
> > > > > > > >> To post to this group, send email to
> > > [email protected]
> > > > > > .
> > > > > > > >> To unsubscribe from this group, send email to
> > > > > > > >> [email protected].
> > > > > > > >> For more options, visit this group at
> > > > > > > >>http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > > > > > > > --
> > > > > > > > You received this message because you are subscribed to the
> > > Google
> > > > > > Groups
> > > > > > > > "Rhino Tools Dev" group.
> > > > > > > > To post to this group, send email to
> > > [email protected].
> > > > > > > > To unsubscribe from this group, send email to
> > > > > > > > [email protected].
> > > > > > > > For more options, visit this group at
> > > > > > > >http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > > > > > --
> > > > > > You received this message because you are subscribed to the Google
> > > Groups
> > > > > > "Rhino Tools Dev" group.
> > > > > > To post to this group, send email to
> > > [email protected].
> > > > > > To unsubscribe from this group, send email to
> > > > > > [email protected].
> > > > > > For more options, visit this group at
> > > > > >http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > > --
> > > You received this message because you are subscribed to the Google Groups
> > > "Rhino Tools Dev" group.
> > > To post to this group, send email to [email protected].
> > > To unsubscribe from this group, send email to
>
> ...
>
> read more »
--
You received this message because you are subscribed to the Google Groups
"Rhino Tools Dev" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/rhino-tools-dev?hl=en.