I set threadcount to 1. The behavior is the same.
When the first message is fired (OutcomeImportStartedEvent) a saga is
created, as expected. Debugging into Rhino ESB it hits this part of
the code (in DefaultServiceBus), which verifies to me that it is
detecting the event as a saga starting event (InitiatedBy). My
consumer is in the "consumers" collection (nothing in
"instanceConsumers" or "sagas" collections).
saga.Id = sagaMessage != null ?
sagaMessage.CorrelationId :
GuidCombGenerator.Generate();
return instanceConsumers
.Union(sagas)
.Union(consumers.Where(x => x != null))
.ToArray();
My consumer is instantiated which news up the state and everything
following runs as expected. It consumes all of the Orchestrates
messages and IsCompleted is set to true at the correct time. The saga
runs correctly from start to finish the first time through. I then
try to run a new saga, started once again by
OutcomeImportStartedEvent. The problem I see now however is that the
consumer and state are not "newed up", instead my consumer picks up
the saga starting event which has a state of IsCompleted = true, this
causes my consumer to be released from the container and then there is
nothing to consume my Orchestrates events. Do I need to reset the
state in my initiatedby consume function rather than in the
constructor?
On May 6, 1:56 pm, Corey Kaylor <[email protected]> wrote:
> Invoking a consumer does not require a subscription. It only requires that
> there is a consumer registered in the container at the endpoint that
> receives the message. With sagas you will get this error message if the
> Orchestrates<TMessage> is received and the saga has already been completed,
> OR the saga hasn't started yet. Based on your code sample it's possible
> that you are receiving OutcomeImportedEvent before the saga is started. You
> can possibly test if this is true by setting the threadCount to 1 and see
> if you still have the same problem.
>
>
>
>
>
>
>
> On Sun, May 6, 2012 at 12:28 PM, RyanL <[email protected]> wrote:
> > When the consumer marks the saga as complete, the following code is
> > called in DefaultServiceBus.cs (saga.IsCompleted is true).
>
> > if (saga.IsCompleted)
> > reflection.InvokeSagaPersisterComplete(persister,
> > saga);
> > else
> > reflection.InvokeSagaPersisterSave(persister, saga);
>
> > However, when I send another message to start a new saga (InitiatedBy)
> > it is consumed by the same consumer and when I inspect the State it
> > has not changed. After this first message is consumed, the consumer
> > is released from the container by this area of code in
> > DefaultServiceBus.cs:
>
> > var initiatedBy =
> > reflection.GetGenericTypeOf(typeof(InitiatedBy<>), msgType);
> > if (initiatedBy.IsInstanceOfType(saga) == false)
> > {
> > serviceLocator.Release(consumers[i]);
> > consumers[i] = null;
> > continue;
> > }
>
> > It is at this point future messages throw the error that there are no
> > consumers.
>
> > On May 6, 8:58 am, RyanL <[email protected]> wrote:
> > > I am creating a new saga, but I'm getting an error that there are no
> > > consumers, so it appears that the subscription is lost after one saga
> > > is completed. Here is my code that creates and publishes the saga
> > > events:
>
> > > var outcomes = retriever.GetFile(textReader).NewsItems;
> > > var sagaId = Guid.NewGuid();
>
> > > bus.Send(new OutcomeImportStartedEvent() { CorrelationId =
> > > sagaId, OutcomesToImport = outcomes.Count() });
> > > foreach (var o in outcomes)
> > > {
> > > var outcomeImportedEvent = new OutcomeImportedEvent()
> > > { Outcome = o, CorrelationId = sagaId };
> > > bus.Send(outcomeImportedEvent);
> > > }
>
> > > On May 6, 8:45 am, "Oren Eini (Ayende Rahien)" <[email protected]>
> > > wrote:
>
> > > > Ryan,
> > > > Each customer create a NEW saga.
>
> > > > On Sun, May 6, 2012 at 4:45 PM, RyanL <[email protected]> wrote:
> > > > > Also how does making the consumer go away make sense? Using
> > Starbucks
> > > > > as an example, this would mean you can only handle taking a single
> > > > > order. The next customer could not place an order.
>
> > > > > On May 5, 12:27 pm, Ryan Langton <[email protected]> wrote:
> > > > > > Ok, is there any way to implement this sort of behavior then?
> > > > > > Users are importing entities. I want an action to occur on each
> > entity
> > > > > > import and another action to occur when the import is 100%
> > complete. I
> > > > > > assumed sagas would be the best way to handle this. But I need to
> > allow
> > > > > > users to perform multiple imports as well as possible simultaneous
> > > > > imports
> > > > > > (multiple users importing at the same time - edge case). The
> > import
> > > > > files
> > > > > > can be quite large to where putting all of the entities in a single
> > > > > message
> > > > > > can easily exceed the limitations of msmq.
>
> > > > > > Just looking for ideas. Thanks for the response.
>
> > > > > > On Sat, May 5, 2012 at 6:40 AM, Oren Eini (Ayende Rahien) <
> > > > > [email protected]
>
> > > > > > > wrote:
> > > > > > > Once the saga is completed, it is just that, completed, done.
> > > > > > > This is the by design behavior.
>
> > > > > > > On Fri, May 4, 2012 at 10:17 PM, RyanL <[email protected]>
> > wrote:
>
> > > > > > >> I have a saga. Once it completes the saga consumer disappears
> > and
> > > > > > >> future messages get an error "Got Message X, but had no
> > consumers for
> > > > > > >> it". What do I do to leave the consumer / subscription active
> > and
> > > > > > >> just start a new saga? Here is my class:
>
> > > > > > >> public class OutcomeImportSagaLuceneIndexer :
> > > > > > >> ISaga<OutcomeImportState>,
>
> > > > > > >> InitiatedBy<OutcomeImportStartedEvent>,
>
> > > > > > >> Orchestrates<OutcomeImportedEvent>
> > > > > > >> {
> > > > > > >> private readonly ILuceneRepository<OutcomeListModel>
> > > > > > >> repository;
>
> > > > > > >> public
>
> > OutcomeImportSagaLuceneIndexer(ILuceneRepository<OutcomeListModel>
> > > > > > >> repository)
> > > > > > >> {
> > > > > > >> this.repository = repository;
> > > > > > >> State = new OutcomeImportState();
> > > > > > >> }
>
> > > > > > >> public void Consume(OutcomeImportStartedEvent message)
> > > > > > >> {
> > > > > > >> State.OutcomesToImport = message.OutcomesToImport;
> > > > > > >> }
>
> > > > > > >> public void Consume(OutcomeImportedEvent message)
> > > > > > >> {
>
> > repository.Upsert(MapToLuceneListRecord(message.Outcome));
> > > > > > >> State.OutcomesImported++;
> > > > > > >> if (State.OutcomesImported < State.OutcomesToImport)
> > > > > > >> return;
>
> > > > > > >> repository.RebuildIndex();
> > > > > > >> IsCompleted = true;
> > > > > > >> }
>
> > > > > > >> private static OutcomeListModel
> > > > > > >> MapToLuceneListRecord(OutcomeViewModel model)
> > > > > > >> {
> > > > > > >> return Mapper.Map<OutcomeViewModel,
> > > > > > >> OutcomeListModel>(model);
> > > > > > >> }
>
> > > > > > >> public Guid Id { get; set; }
> > > > > > >> public bool IsCompleted { get; set; }
> > > > > > >> public OutcomeImportState State { get; set; }
> > > > > > >> }
>
> > > > > > >> --
> > > > > > >> You received this message because you are subscribed to the
> > Google
> > > > > Groups
> > > > > > >> "Rhino Tools Dev" group.
> > > > > > >> To post to this group, send email to
> > [email protected]
> > > > > .
> > > > > > >> To unsubscribe from this group, send email to
> > > > > > >> [email protected].
> > > > > > >> For more options, visit this group at
> > > > > > >>http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > > > > > > --
> > > > > > > You received this message because you are subscribed to the
> > Google
> > > > > Groups
> > > > > > > "Rhino Tools Dev" group.
> > > > > > > To post to this group, send email to
> > [email protected].
> > > > > > > To unsubscribe from this group, send email to
> > > > > > > [email protected].
> > > > > > > For more options, visit this group at
> > > > > > >http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > > > > --
> > > > > You received this message because you are subscribed to the Google
> > Groups
> > > > > "Rhino Tools Dev" group.
> > > > > To post to this group, send email to
> > [email protected].
> > > > > To unsubscribe from this group, send email to
> > > > > [email protected].
> > > > > For more options, visit this group at
> > > > >http://groups.google.com/group/rhino-tools-dev?hl=en.
>
> > --
> > You received this message because you are subscribed to the Google Groups
> > "Rhino Tools Dev" group.
> > To post to this group, send email to [email protected].
> > To unsubscribe from this group, send email to
> > [email protected].
> > For more options, visit this group at
> >http://groups.google.com/group/rhino-tools-dev?hl=en.
--
You received this message because you are subscribed to the Google Groups
"Rhino Tools Dev" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/rhino-tools-dev?hl=en.