Hi Chris, Heh, nice pick up on my typo.
Thanks for the explanation on the importance of defining a key serializer, also. And yes, sure enough after specifying those configs (and fixing my typo), everything worked as intended and the contents of my test file were put onto a stream which my StreamTask managed to receive. I will open a JIRA for the error logging. Thanks a lot for the ongoing support, Jonathan On 3 Feb 2014, at 9:26, Chris Riccomini <[email protected]> wrote: > Hey Jonathan, > > Also, I noticed you have "serder" instead of "serde" for your msg serde > config > (https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s > rc/main/config/medical-data-feed.properties). In both cases, this should > be: > > systems.kafka.samza.key.serde=string > > systems.kafka.samza.msg.serde=json > > > No 'r'. Didn't notice that before. > > Cheers, > Chris > > On 2/3/14 9:21 AM, "Chris Riccomini" <[email protected]> wrote: > >> Hey Jonathan, >> >> Ah ah. This error is caused because you're sending a non-byte-array key in >> your StreamTask, and you don't have a key serializer defined. Samza, by >> default, allows you to send messages to a system without a serializer >> defined. If you do this, then it assumes you're giving it bytes. >> >> Here's are the configs you need: >> >> >> serializers.registry.string.class=org.apache.samza.serializers.StringSerde >> F >> actory >> >> systems.kafka.samza.key.serder=string >> >> >> This is causing no messages to be sent, which explains why you're unable >> to see any output from your job. >> >> We could definitely improve the error messaging here. Could you open a >> JIRA for this? >> >> Cheers, >> Chris >> >> On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <[email protected]> >> wrote: >> >>> Hi Chris, >>> >>> Oh dear, that's rather embarrassing, although it makes perfect sense now. >>> >>> Here is the first ~900 lines of the second container log (that warning >>> continues to repeat): >>> https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2 >>> e >>> 14fb0e717/gistfile1.txt >>> >>> There definitely seems to be an issue there. Unfortunately, I cannot work >>> out where exactly the problem lies since the String cast error message is >>> truncated. Do you know of any easy way to get the rest of this message? >>> >>> Thank you for your patience as well. It is very much appreciated! >>> >>> Jonathan >>> >>> >>> On 31 Jan 2014, at 11:20, Chris Riccomini <[email protected]> >>> wrote: >>> >>>> Hey Jonathan, >>>> >>>> Could you send the logs for the actual container? The AM isn't actually >>>> running your code--it just manages and tells YARN to start a SECOND >>>> container to run your code. :) >>>> >>>> For example, in the AM logs, you'll see: >>>> >>>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0 >>>> for >>>> container container_1391135766129_0001_01_000002 on node s1 >>>> >>>> (http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000 >>>> 2 >>>> ). >>>> >>>> This is the log path to the container that's running your code. >>>> >>>> Alternatively, you can find the second container by clicking on the >>>> ApplicationMaster link in the YARN RM web UI, and then clicking on the >>>> link in the "running containers" section that ends with _000002. >>>> >>>> If you could grab that log and send it here, I can take a look and help >>>> you figure out what's going on. >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir" >>>> <[email protected]> >>>> wrote: >>>> >>>>> Hi Chris, >>>>> >>>>> Thanks for your thoughts. >>>>> >>>>> 1. I will definitely put file reading on another thread, I just wanted >>>>> to >>>>> get it working for now with a very simple test file before I move on >>>>> and >>>>> re-implement my code to be more robust. Thanks for the pointers on >>>>> checking for space before calling BlockingEnvelopeMap.put(), also. I >>>>> will >>>>> definitely come back to that afterwards. >>>>> >>>>> 2. Again, will definitely revisit this point later on as well as the >>>>> file >>>>> offset tip. >>>>> >>>>> 3. Here are the logs: >>>>> application-master.log: >>>>> https://gist.github.com/poltak/8726030 >>>>> >>>>> gc.log (probably not needed, but it's there): >>>>> https://gist.github.com/poltak/8726004 >>>>> >>>>> stdout: >>>>> https://gist.github.com/poltak/8726036 >>>>> >>>>> stderr was clean. >>>>> >>>>> Not sure if you will find anything useful out of those, but I'm sure >>>>> your >>>>> understanding of it will greatly outweigh mine. >>>>> >>>>> Thanks, >>>>> Jonathan >>>>> >>>>> >>>>> On 30 Jan 2014, at 15:47, Chris Riccomini <[email protected]> >>>>> wrote: >>>>> >>>>>> Hey Jonathan, >>>>>> >>>>>> Another follow-on thought. You're currently using null for the >>>>>> offset, >>>>>> but >>>>>> you could very easily use filepos, and then fseek to the filepos when >>>>>> a >>>>>> SystemStreamPartition is instantiated. This would give you the >>>>>> ability >>>>>> to >>>>>> "pick up where you left off" rather than re-reading the whole file >>>>>> every >>>>>> time your SamzaContainer starts. >>>>>> >>>>>> Cheers, >>>>>> Chris >>>>>> >>>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hey Jonathan, >>>>>>> >>>>>>> I took a look at your code. Nothing looks horribly wrong at first >>>>>>> glance. >>>>>>> A couple of thoughts: >>>>>>> >>>>>>> 1. You're reading the full file and loading it into the >>>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file >>>>>>> is >>>>>>> small, and will ALWAYS be small. If the file is NOT small, you need >>>>>>> to >>>>>>> move the file reading to another thread, and only >>>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop >>>>>>> methods >>>>>>> would then start and stop your reader thread. The "when there is >>>>>>> space" >>>>>>> behavior can be implemented in one of two ways: 1) check >>>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines, >>>>>>> and >>>>>>> block if it's above some threshold or 2) override >>>>>>> BlockingEnvelopeMap.newBlockingQueue >>>>>>> To use a bounded queue, which will automatically force your reader >>>>>>> thread >>>>>>> to block if the queue is full. >>>>>>> 2. An alternative input path style would be to just define the input >>>>>>> path >>>>>>> as the stream name. For example, >>>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the >>>>>>> MedicalDataConsumer.register method call >>>>>>> systemStreamPartition.getStream >>>>>>> to get the file path, and instantiate the file reader there. The >>>>>>> advantage >>>>>>> to this approach is it means you only need one MedicaDataConsumer to >>>>>>> read >>>>>>> from N number of files, rather than having one MedicalDataConsumer >>>>>>> (and >>>>>>> system) per file. >>>>>>> 3. Can you send the output log when running your job? >>>>>>> >>>>>>> >>>>>>> Cheers, >>>>>>> Chris >>>>>>> >>>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir" >>>>>>> <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the help, Chris! >>>>>>>> >>>>>>>> The Javadocs you updated certainly did help my understanding of the >>>>>>>> SystemConsumer interface, although I still have not been able to >>>>>>>> get >>>>>>>> my >>>>>>>> System to work. >>>>>>>> >>>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap >>>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for >>>>>>>> what >>>>>>>> I >>>>>>>> want to do plus the hello-samza example uses it so I can use it as >>>>>>>> a >>>>>>>> reference example. >>>>>>>> >>>>>>>> Even simply putting a call to 'put()' at the beginning of the >>>>>>>> 'start()' >>>>>>>> method with a debug message, nothing is received by my simple >>>>>>>> StreamTask >>>>>>>> (which simply forwards what is received from my System's stream >>>>>>>> onto a >>>>>>>> Kafka stream for now). As this is the case, I think there is a flaw >>>>>>>> in my >>>>>>>> understanding of something much more fundamental here relating to >>>>>>>> Samza >>>>>>>> System... I am sure there is something very simple that I am >>>>>>>> missing >>>>>>>> with >>>>>>>> my implementation, as I have based it directly off the >>>>>>>> WikipediaSystem of >>>>>>>> which I feel I have a pretty thorough understanding of now. >>>>>>>> >>>>>>>> If anyone has time to look at my implementations for >>>>>>>> SystemConsumer, >>>>>>>> SystemFactory and the previously mentioned StreamTask's config file >>>>>>>> (they're all very simple and "too-the-point", since I'm just trying >>>>>>>> to >>>>>>>> get things working), it would be very much appreciated. >>>>>>>> >>>>>>>> SystemFactory implementation: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped >>>>>>>> i >>>>>>>> a/ >>>>>>>> sr >>>>>>>> c >>>>>>>> >>>>>>>> >>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory. >>>>>>>> j >>>>>>>> av >>>>>>>> a >>>>>>>> >>>>>>>> SystemConsumer implementation: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped >>>>>>>> i >>>>>>>> a/ >>>>>>>> sr >>>>>>>> c >>>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java >>>>>>>> >>>>>>>> StreamTask config (which specifies input from my System): >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac >>>>>>>> k >>>>>>>> ag >>>>>>>> e/ >>>>>>>> s >>>>>>>> rc/main/config/medical-data-feed.properties >>>>>>>> >>>>>>>> And yes, once I get it working and reading a file into a Samza >>>>>>>> Stream, I >>>>>>>> will be happy to submit a patch. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jonathan >>>>>>>> >>>>>>>> >>>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Jonathan, >>>>>>>>> >>>>>>>>> I've attempted to answer your questions by updating the Javadocs. >>>>>>>>> :) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav >>>>>>>>> a >>>>>>>>> do >>>>>>>>> cs >>>>>>>>> / >>>>>>>>> or >>>>>>>>> g/apache/samza/system/SystemConsumer.html >>>>>>>>> >>>>>>>>> Let me know if anything doesn't make sense. >>>>>>>>> >>>>>>>>> Also, it'd be awesome if you could contribute your file reader >>>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of >>>>>>>>> things. >>>>>>>>> Could you open a JIRA and submit a patch when you're ready? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Chris >>>>>>>>> >>>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir" >>>>>>>>> <[email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> Currently trying to write a simple Samza System that reads from a >>>>>>>>>> file >>>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing >>>>>>>>>> it >>>>>>>>>> off >>>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory >>>>>>>>>> implementation seemed to be pretty straightforward (get path to >>>>>>>>>> file >>>>>>>>>> from >>>>>>>>>> config and return a SystemConsumer that reads the file at that >>>>>>>>>> path), >>>>>>>>>> although I am not 100% sure of what the purpose of all the >>>>>>>>>> SystemConsumer >>>>>>>>>> interface methods are. >>>>>>>>>> >>>>>>>>>> start() and stop() seem fairly self-explanatory, getting called >>>>>>>>>> at >>>>>>>>>> the >>>>>>>>>> when the System is started and stopped, respectively (please let >>>>>>>>>> me >>>>>>>>>> know >>>>>>>>>> if I am wrong about any of my understandings). register() seems >>>>>>>>>> to >>>>>>>>>> be >>>>>>>>>> similar to start() in that it will be called near the beginning >>>>>>>>>> of >>>>>>>>>> the >>>>>>>>>> System, although giving access to the SystemStreamPartition and a >>>>>>>>>> given >>>>>>>>>> partition offset, correct? The poll() method seems to be where >>>>>>>>>> most >>>>>>>>>> of >>>>>>>>>> the action takes place, and going by its name, it is called often >>>>>>>>>> or >>>>>>>>>> at >>>>>>>>>> specified intervals? If so, how does this polling work and how is >>>>>>>>>> the >>>>>>>>>> interval specified? >>>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from >>>>>>>>>> poll(): >>>>>>>>>> these are then forwarded on to their specified >>>>>>>>>> SystemStreamPartitions >>>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)? >>>>>>>>>> >>>>>>>>>> Anyway, thanks for your time and let me know how far off I am >>>>>>>>>> with >>>>>>>>>> my >>>>>>>>>> understanding of this (as I can't get anything working with my >>>>>>>>>> current >>>>>>>>>> system implementation). Will be happy to contribute back Javadoc >>>>>>>>>> patch >>>>>>>>>> reflecting my amended understanding of this interface afterwards. >>>>>>>>>> >>>>>>>>>> Jonathan >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
