Hey sgg,

Ah, I didn't notice, but your code does not implement StreamTask, just
InitableTask.

These interfaces are like mix-ins. You MUST implement StreamTask. Try:

  public class SimpleSamzaTask implements StreamTask, InitableTask {

    ...
  }

Sorry about that.

Cheers,
Chris

On 12/9/13 5:45 PM, "sgg" <[email protected]> wrote:

>Ok thanks Chris.  But I still need to figure out why the init() method is
>not getting called, in fact, the entire Samza job fails.  It works if the
>task implements StreamTask (obviously not invoking init()), but at least
>the task runs.  When I change it to implement InitableTask() as shown,
>the job fails.  It seems to not be able to instantiate the task object, I
>was hoping there would be an example that runs that would allow me to see
>what a correct start up sequence looks like.
>
>sgg
>On Dec 9, 2013, at 12:27 PM, Chris Riccomini <[email protected]>
>wrote:
>
>> Hi there,
>> 
>> Your task looks good. Your init() method receives two parameters:
>>Config,
>> and TaskContext. The config object has *all* config properties defined
>>in
>> your job's config. If you were to write:
>> 
>> @Override
>> public void init(Config config, TaskContext context) throws Exception {
>>  System.out.println(config.get("task.foo.bar");
>>  }
>> 
>> 
>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>print
>> "someVal" once for each partition that the Samza container is
>>responsible
>> for. For example, if you had a single Samza container
>> (yarn.container.count=1, or using LocalJobFactory), and you had defined
>>a
>> single input stream that had 4 partitions, your logs would show
>>"someVal"
>> printed four times (one for each partition that the Samza container is
>> responsible for processing).
>> 
>> Cheers,
>> Chris
>> 
>> On 12/7/13 4:18 AM, "sgg" <[email protected]> wrote:
>> 
>>> Does anyone have a working example of a Samza job using an
>>>InitableTask?
>>> 
>>> It wasn't clear from the documentation what ways to specify the input
>>> parameters to the init() method.  Is it a matter of adding lines in the
>>> config file that look like:
>>> task.foo.bar = someVal
>>> 
>>> ?
>>> 
>>> Also, when I tried to run a simple example with an InitableTask, I get
>>>an
>>> error.  It seems there is some problem when Samza attempts to
>>>instantiate
>>> the class
>>> 
>>> 
>>> Here is the sample task I am trying to run:
>>> public class SimpleSamzaTask implements InitableTask {
>>> private static final SystemStream OUTPUT_STREAM =
>>>       new SystemStream("kafka", "simpleout");
>>> 
>>> public SimpleSamzaTask(){
>>>       super();
>>> }
>>> 
>>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>>> collector, TaskCoordinator coordinator) {
>>>       System.out.println("hello world");
>>>       collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: "
>>>+
>>> envelope.getMessage()));
>>> }
>>> 
>>> @Override
>>> public void init(Config arg0, TaskContext arg1) throws Exception {
>>>       // TODO Auto-generated method stub
>>>       System.out.println("in init");
>>> }
>>> }
>>> 
>>> Thoughts?
>> 
>

Reply via email to