Hi Chris,

Heh, nice pick up on my typo.

Thanks for the explanation on the importance of defining a key serializer, also.
And yes, sure enough after specifying those configs (and fixing my typo), 
everything worked as intended and the contents of my test file were put onto a 
stream which my StreamTask managed to receive.

I will open a JIRA for the error logging.

Thanks a lot for the ongoing support,
Jonathan

On 3 Feb 2014, at 9:26, Chris Riccomini <[email protected]> wrote:

> Hey Jonathan,
> 
> Also, I noticed you have "serder" instead of "serde" for your msg serde
> config 
> (https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s
> rc/main/config/medical-data-feed.properties). In both cases, this should
> be:
> 
> systems.kafka.samza.key.serde=string
> 
> systems.kafka.samza.msg.serde=json
> 
> 
> No 'r'. Didn't notice that before.
> 
> Cheers,
> Chris
> 
> On 2/3/14 9:21 AM, "Chris Riccomini" <[email protected]> wrote:
> 
>> Hey Jonathan,
>> 
>> Ah ah. This error is caused because you're sending a non-byte-array key in
>> your StreamTask, and you don't have a key serializer defined. Samza, by
>> default, allows you to send messages to a system without a serializer
>> defined. If you do this, then it assumes you're giving it bytes.
>> 
>> Here's are the configs you need:
>> 
>> 
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerde
>> F
>> actory
>> 
>> systems.kafka.samza.key.serder=string
>> 
>> 
>> This is causing no messages to be sent, which explains why you're unable
>> to see any output from your job.
>> 
>> We could definitely improve the error messaging here. Could you open a
>> JIRA for this?
>> 
>> Cheers,
>> Chris
>> 
>> On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <[email protected]>
>> wrote:
>> 
>>> Hi Chris,
>>> 
>>> Oh dear, that's rather embarrassing, although it makes perfect sense now.
>>> 
>>> Here is the first ~900 lines of the second container log (that warning
>>> continues to repeat):
>>> https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2
>>> e
>>> 14fb0e717/gistfile1.txt
>>> 
>>> There definitely seems to be an issue there. Unfortunately, I cannot work
>>> out where exactly the problem lies since the String cast error message is
>>> truncated. Do you know of any easy way to get the rest of this message?
>>> 
>>> Thank you for your patience as well. It is very much appreciated!
>>> 
>>> Jonathan
>>> 
>>> 
>>> On 31 Jan 2014, at 11:20, Chris Riccomini <[email protected]>
>>> wrote:
>>> 
>>>> Hey Jonathan,
>>>> 
>>>> Could you send the logs for the actual container? The AM isn't actually
>>>> running your code--it just manages and tells YARN to start a SECOND
>>>> container to run your code. :)
>>>> 
>>>> For example, in the AM logs, you'll see:
>>>> 
>>>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0
>>>> for
>>>> container container_1391135766129_0001_01_000002 on node s1
>>>> 
>>>> (http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000
>>>> 2
>>>> ).
>>>> 
>>>> This is the log path to the container that's running your code.
>>>> 
>>>> Alternatively, you can find the second container by clicking on the
>>>> ApplicationMaster link in the YARN RM web UI, and then clicking on the
>>>> link in the "running containers" section that ends with _000002.
>>>> 
>>>> If you could grab that log and send it here, I can take a look and help
>>>> you figure out what's going on.
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir"
>>>> <[email protected]>
>>>> wrote:
>>>> 
>>>>> Hi Chris,
>>>>> 
>>>>> Thanks for your thoughts.
>>>>> 
>>>>> 1. I will definitely put file reading on another thread, I just wanted
>>>>> to
>>>>> get it working for now with a very simple test file before I move on
>>>>> and
>>>>> re-implement my code to be more robust. Thanks for the pointers on
>>>>> checking for space before calling BlockingEnvelopeMap.put(), also. I
>>>>> will
>>>>> definitely come back to that afterwards.
>>>>> 
>>>>> 2. Again, will definitely revisit this point later on as well as the
>>>>> file
>>>>> offset tip.
>>>>> 
>>>>> 3. Here are the logs:
>>>>> application-master.log:
>>>>> https://gist.github.com/poltak/8726030
>>>>> 
>>>>> gc.log (probably not needed, but it's there):
>>>>> https://gist.github.com/poltak/8726004
>>>>> 
>>>>> stdout:
>>>>> https://gist.github.com/poltak/8726036
>>>>> 
>>>>> stderr was clean.
>>>>> 
>>>>> Not sure if you will find anything useful out of those, but I'm sure
>>>>> your
>>>>> understanding of it will greatly outweigh mine.
>>>>> 
>>>>> Thanks,
>>>>> Jonathan
>>>>> 
>>>>> 
>>>>> On 30 Jan 2014, at 15:47, Chris Riccomini <[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Hey Jonathan,
>>>>>> 
>>>>>> Another follow-on thought. You're currently using null for the
>>>>>> offset,
>>>>>> but
>>>>>> you could very easily use filepos, and then fseek to the filepos when
>>>>>> a
>>>>>> SystemStreamPartition is instantiated. This would give you the
>>>>>> ability
>>>>>> to
>>>>>> "pick up where you left off" rather than re-reading the whole file
>>>>>> every
>>>>>> time your SamzaContainer starts.
>>>>>> 
>>>>>> Cheers,
>>>>>> Chris
>>>>>> 
>>>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hey Jonathan,
>>>>>>> 
>>>>>>> I took a look at your code. Nothing looks horribly wrong at first
>>>>>>> glance.
>>>>>>> A couple of thoughts:
>>>>>>> 
>>>>>>> 1. You're reading the full file and loading it into the
>>>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file
>>>>>>> is
>>>>>>> small, and will ALWAYS be small. If the file is NOT small, you need
>>>>>>> to
>>>>>>> move the file reading to another thread, and only
>>>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop
>>>>>>> methods
>>>>>>> would then start and stop your reader thread. The "when there is
>>>>>>> space"
>>>>>>> behavior can be implemented in one of two ways: 1) check
>>>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines,
>>>>>>> and
>>>>>>> block if it's above some threshold or 2) override
>>>>>>> BlockingEnvelopeMap.newBlockingQueue
>>>>>>> To use a bounded queue, which will automatically force your reader
>>>>>>> thread
>>>>>>> to block if the queue is full.
>>>>>>> 2. An alternative input path style would be to just define the input
>>>>>>> path
>>>>>>> as the stream name. For example,
>>>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>>>>>>> MedicalDataConsumer.register method call
>>>>>>> systemStreamPartition.getStream
>>>>>>> to get the file path, and instantiate the file reader there. The
>>>>>>> advantage
>>>>>>> to this approach is it means you only need one MedicaDataConsumer to
>>>>>>> read
>>>>>>> from N number of files, rather than having one MedicalDataConsumer
>>>>>>> (and
>>>>>>> system) per file.
>>>>>>> 3. Can you send the output log when running your job?
>>>>>>> 
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>> 
>>>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir"
>>>>>>> <[email protected]>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Thanks for the help, Chris!
>>>>>>>> 
>>>>>>>> The Javadocs you updated certainly did help my understanding of the
>>>>>>>> SystemConsumer interface, although I still have not been able to
>>>>>>>> get
>>>>>>>> my
>>>>>>>> System to work.
>>>>>>>> 
>>>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for
>>>>>>>> what
>>>>>>>> I
>>>>>>>> want to do plus the hello-samza example uses it so I can use it as
>>>>>>>> a
>>>>>>>> reference example.
>>>>>>>> 
>>>>>>>> Even simply putting a call to 'put()' at the beginning of the
>>>>>>>> 'start()'
>>>>>>>> method with a debug message, nothing is received by my simple
>>>>>>>> StreamTask
>>>>>>>> (which simply forwards what is received from my System's stream
>>>>>>>> onto a
>>>>>>>> Kafka stream for now). As this is the case, I think there is a flaw
>>>>>>>> in my
>>>>>>>> understanding of something much more fundamental here relating to
>>>>>>>> Samza
>>>>>>>> System... I am sure there is something very simple that I am
>>>>>>>> missing
>>>>>>>> with
>>>>>>>> my implementation, as I have based it directly off the
>>>>>>>> WikipediaSystem of
>>>>>>>> which I feel I have a pretty thorough understanding of now.
>>>>>>>> 
>>>>>>>> If anyone has time to look at my implementations for
>>>>>>>> SystemConsumer,
>>>>>>>> SystemFactory and the previously mentioned StreamTask's config file
>>>>>>>> (they're all very simple and "too-the-point", since I'm just trying
>>>>>>>> to
>>>>>>>> get things working), it would be very much appreciated.
>>>>>>>> 
>>>>>>>> SystemFactory implementation:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>> i
>>>>>>>> a/
>>>>>>>> sr
>>>>>>>> c
>>>>>>>> 
>>>>>>>> 
>>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.
>>>>>>>> j
>>>>>>>> av
>>>>>>>> a
>>>>>>>> 
>>>>>>>> SystemConsumer implementation:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped
>>>>>>>> i
>>>>>>>> a/
>>>>>>>> sr
>>>>>>>> c
>>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>>>>>>> 
>>>>>>>> StreamTask config (which specifies input from my System):
>>>>>>>> 
>>>>>>>> 
>>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac
>>>>>>>> k
>>>>>>>> ag
>>>>>>>> e/
>>>>>>>> s
>>>>>>>> rc/main/config/medical-data-feed.properties
>>>>>>>> 
>>>>>>>> And yes, once I get it working and reading a file into a Samza
>>>>>>>> Stream, I
>>>>>>>> will be happy to submit a patch.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Jonathan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hey Jonathan,
>>>>>>>>> 
>>>>>>>>> I've attempted to answer your questions by updating the Javadocs.
>>>>>>>>> :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav
>>>>>>>>> a
>>>>>>>>> do
>>>>>>>>> cs
>>>>>>>>> /
>>>>>>>>> or
>>>>>>>>> g/apache/samza/system/SystemConsumer.html
>>>>>>>>> 
>>>>>>>>> Let me know if anything doesn't make sense.
>>>>>>>>> 
>>>>>>>>> Also, it'd be awesome if you could contribute your file reader
>>>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>>>>>>> things.
>>>>>>>>> Could you open a JIRA and submit a patch when you're ready?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Chris
>>>>>>>>> 
>>>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>>> Hello,
>>>>>>>>>> 
>>>>>>>>>> Currently trying to write a simple Samza System that reads from a
>>>>>>>>>> file
>>>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing
>>>>>>>>>> it
>>>>>>>>>> off
>>>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>>>>>>> implementation seemed to be pretty straightforward (get path to
>>>>>>>>>> file
>>>>>>>>>> from
>>>>>>>>>> config and return a SystemConsumer that reads the file at that
>>>>>>>>>> path),
>>>>>>>>>> although I am not 100% sure of what the purpose of all the
>>>>>>>>>> SystemConsumer
>>>>>>>>>> interface methods are.
>>>>>>>>>> 
>>>>>>>>>> start() and stop() seem fairly self-explanatory, getting called
>>>>>>>>>> at
>>>>>>>>>> the
>>>>>>>>>> when the System is started and stopped, respectively (please let
>>>>>>>>>> me
>>>>>>>>>> know
>>>>>>>>>> if I am wrong about any of my understandings). register() seems
>>>>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>> similar to start() in that it will be called near the beginning
>>>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>> System, although giving access to the SystemStreamPartition and a
>>>>>>>>>> given
>>>>>>>>>> partition offset, correct? The poll() method seems to be where
>>>>>>>>>> most
>>>>>>>>>> of
>>>>>>>>>> the action takes place, and going by its name, it is called often
>>>>>>>>>> or
>>>>>>>>>> at
>>>>>>>>>> specified intervals? If so, how does this polling work and how is
>>>>>>>>>> the
>>>>>>>>>> interval specified?
>>>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>>>>>>> poll():
>>>>>>>>>> these are then forwarded on to their specified
>>>>>>>>>> SystemStreamPartitions
>>>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>>>>>>> 
>>>>>>>>>> Anyway, thanks for your time and let me know how far off I am
>>>>>>>>>> with
>>>>>>>>>> my
>>>>>>>>>> understanding of this (as I can't get anything working with my
>>>>>>>>>> current
>>>>>>>>>> system implementation). Will be happy to contribute back Javadoc
>>>>>>>>>> patch
>>>>>>>>>> reflecting my amended understanding of this interface afterwards.
>>>>>>>>>> 
>>>>>>>>>> Jonathan
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to