Hi Martin

I'm sorry for the delayed response. I'm not sure if this is rather a YARN
problem than a Samza issue. From the log message it looks like Samza does
correctly shutdown, but the container is still in "RUNNING" state. Here's
the log output:

11:07:08,552 samza.container.SamzaContainer INFO : Entering run loop.
11:07:08,791 system.kafka.KafkaSystemProducer INFO : Creating a new
producer for system kafka.
12:26:08,832 samza.container.RunLoop INFO : Shutdown has now been requested
by tasks: Set(Partition [partition=0])
12:26:08,833 samza.container.RunLoop INFO : Shutdown requested.
12:26:08,833 samza.container.SamzaContainer INFO : Shutting down.
12:26:08,834 samza.container.SamzaContainer INFO : Shutting down consumer
multiplexer.
12:26:08,835 system.kafka.BrokerProxy INFO : Shutting down BrokerProxy for
minion08.ifi.uzh.ch:9092
12:26:08,901 system.kafka.BrokerProxy INFO : Got closed by interrupt
exception in broker proxy thread.
12:26:08,902 system.kafka.BrokerProxy INFO : Shutting down due to interrupt.
12:26:08,903 samza.container.SamzaContainer INFO : Shutting down producer
multiplexer.
12:26:08,909 samza.container.SamzaContainer INFO : Shutting down task
instance stream tasks.
12:26:08,911 samza.container.SamzaContainer INFO : Shutting down task
instance stores.
12:26:08,920 samza.container.SamzaContainer INFO : Shutting down offset
manager.
12:26:08,921 samza.container.SamzaContainer INFO : Shutting down metrics
reporters.
12:26:08,922 metrics.reporter.MetricsSnapshotReporter INFO : Stopping
producer.
12:26:08,923 metrics.reporter.MetricsSnapshotReporter INFO : Stopping
reporter timer.
12:26:08,925 samza.container.SamzaContainer INFO : Shutting down JVM
metrics.
12:26:08,925 samza.container.SamzaContainer INFO : Shutdown complete.

Any idea on how to further debug this issue?

Cheers
Nicolas


On Mon, Jun 16, 2014 at 7:27 PM, Martin Kleppmann <
[email protected]> wrote:

> Hi Nicolas,
>
> Thanks for trying the new feature. Yes, if you're only running one task
> instance in each container, then shutdown(CURRENT_TASK) should shut down
> the container (i.e. behave the same as shutdown(ALL_TASKS_IN_CONTAINER)).
>
> Do you see any messages like "Shutdown has now been requested by tasks:
> [...]" (at info level) in your container logs? They should indicate the
> partitions of the current container which have requested shutdown. If you
> compare that to the list of partitions assigned to the current container,
> the container should shut down when those sets of partitions are the same.
> If it doesn't do that, it's a bug.
>
> Regarding receiving the container name, try
> System.getenv("SAMZA_CONTAINER_NAME").
>
> Martin
>
> On 15 Jun 2014, at 12:57, Nicolas Bär <[email protected]> wrote:
>
> > Hi All
> >
> > I tried the new shutdown feature from SAMZA-253.
> >
> > It works well in case one container is started with multiple threads on a
> > single node. But
> > `taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);`
> > seems not to work running a Samza Stream Task with 20 containers on 4
> > machines using YARN. Each container handles one partition only, therefore
> > I'm consuming from a Kafka topic with 20 partitions.
> >
> > As far as I understood, this would mean on every call of
> > `taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);`
> the
> > corresponding container would finish. This is not the case. In fact all
> > containers are still running after calling this command in every task.
> >
> > In case it matters: the command is called in the window function of the
> > WindowableTask.
> >
> > Yarn: 2.2.0
> > Samza: 0.7.0 branch (from last friday:
> > 052de224a3256cc652032de5e804338b4dc92fe0)
> >
> > Any hints on how to further debug this?
> >
> > Second question: Is there any chance to receive the container name /
> number
> > within the task instance? It would make debugging a lot easier :)
> >
> >
> > Cheers
> > Nicolas
>
>

Reply via email to