Hey sgg, If you're using YARN, which it sounds like you are, they should be piped to the stdout file of the container (not the ApplicationMaster). The way to find the container is by going to the YARN web UI, and clicking on the ApplicationMaster link for your Samza job. This will lead you to the ApplicationMaster's web UI (confusing, I know), which will have a link to your Samza job's containers. If you're playing with hello-samza, it'll be something like container_12345678_1234536_2. That last "2" is the container number for your Samza job: 1 is the AM, and 2 is the Samza container that's running your StreamTasks.
Cheers, Chris On 12/9/13 7:37 PM, "sgg" <[email protected]> wrote: >Hi Chris: >DOH! that was the problem. Made the change to implement both interfaces >and things ran fine! Thanks for the pointer! > >BTW, where does the output from the System.out.println statements appear? >i.e. where is stdout being piped to? I had expected to see it in the logs >facility in the Yarn web console, but these print statements don't appear >in the yarn stdout log. Where should I be looking? > >sgg >On Dec 9, 2013, at 10:14 PM, Chris Riccomini <[email protected]> >wrote: > >> Hey sgg, >> >> Ah, I didn't notice, but your code does not implement StreamTask, just >> InitableTask. >> >> These interfaces are like mix-ins. You MUST implement StreamTask. Try: >> >> public class SimpleSamzaTask implements StreamTask, InitableTask { >> >> ... >> } >> >> Sorry about that. >> >> Cheers, >> Chris >> >> On 12/9/13 5:45 PM, "sgg" <[email protected]> wrote: >> >>> Ok thanks Chris. But I still need to figure out why the init() method >>>is >>> not getting called, in fact, the entire Samza job fails. It works if >>>the >>> task implements StreamTask (obviously not invoking init()), but at >>>least >>> the task runs. When I change it to implement InitableTask() as shown, >>> the job fails. It seems to not be able to instantiate the task >>>object, I >>> was hoping there would be an example that runs that would allow me to >>>see >>> what a correct start up sequence looks like. >>> >>> sgg >>> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <[email protected]> >>> wrote: >>> >>>> Hi there, >>>> >>>> Your task looks good. Your init() method receives two parameters: >>>> Config, >>>> and TaskContext. The config object has *all* config properties defined >>>> in >>>> your job's config. If you were to write: >>>> >>>> @Override >>>> public void init(Config config, TaskContext context) throws Exception >>>>{ >>>> System.out.println(config.get("task.foo.bar"); >>>> } >>>> >>>> >>>> And you had task.foo.bar=someVal, you'd then expect to see your task >>>> print >>>> "someVal" once for each partition that the Samza container is >>>> responsible >>>> for. For example, if you had a single Samza container >>>> (yarn.container.count=1, or using LocalJobFactory), and you had >>>>defined >>>> a >>>> single input stream that had 4 partitions, your logs would show >>>> "someVal" >>>> printed four times (one for each partition that the Samza container is >>>> responsible for processing). >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 12/7/13 4:18 AM, "sgg" <[email protected]> wrote: >>>> >>>>> Does anyone have a working example of a Samza job using an >>>>> InitableTask? >>>>> >>>>> It wasn't clear from the documentation what ways to specify the input >>>>> parameters to the init() method. Is it a matter of adding lines in >>>>>the >>>>> config file that look like: >>>>> task.foo.bar = someVal >>>>> >>>>> ? >>>>> >>>>> Also, when I tried to run a simple example with an InitableTask, I >>>>>get >>>>> an >>>>> error. It seems there is some problem when Samza attempts to >>>>> instantiate >>>>> the class >>>>> >>>>> >>>>> Here is the sample task I am trying to run: >>>>> public class SimpleSamzaTask implements InitableTask { >>>>> private static final SystemStream OUTPUT_STREAM = >>>>> new SystemStream("kafka", "simpleout"); >>>>> >>>>> public SimpleSamzaTask(){ >>>>> super(); >>>>> } >>>>> >>>>> public void process(IncomingMessageEnvelope envelope, >>>>>MessageCollector >>>>> collector, TaskCoordinator coordinator) { >>>>> System.out.println("hello world"); >>>>> collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: >>>>>" >>>>> + >>>>> envelope.getMessage())); >>>>> } >>>>> >>>>> @Override >>>>> public void init(Config arg0, TaskContext arg1) throws Exception { >>>>> // TODO Auto-generated method stub >>>>> System.out.println("in init"); >>>>> } >>>>> } >>>>> >>>>> Thoughts? >>>> >>> >> >
