Hey Jonathan, Also, I noticed you have "serder" instead of "serde" for your msg serde config (https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/s rc/main/config/medical-data-feed.properties). In both cases, this should be:
systems.kafka.samza.key.serde=string systems.kafka.samza.msg.serde=json No 'r'. Didn't notice that before. Cheers, Chris On 2/3/14 9:21 AM, "Chris Riccomini" <[email protected]> wrote: >Hey Jonathan, > >Ah ah. This error is caused because you're sending a non-byte-array key in >your StreamTask, and you don't have a key serializer defined. Samza, by >default, allows you to send messages to a system without a serializer >defined. If you do this, then it assumes you're giving it bytes. > >Here's are the configs you need: > > >serializers.registry.string.class=org.apache.samza.serializers.StringSerde >F >actory > > systems.kafka.samza.key.serder=string > > >This is causing no messages to be sent, which explains why you're unable >to see any output from your job. > >We could definitely improve the error messaging here. Could you open a >JIRA for this? > >Cheers, >Chris > >On 1/31/14 4:24 PM, "Jonathan Poltak Samosir" <[email protected]> >wrote: > >>Hi Chris, >> >>Oh dear, that's rather embarrassing, although it makes perfect sense now. >> >>Here is the first ~900 lines of the second container log (that warning >>continues to repeat): >>https://gist.github.com/poltak/8745919/raw/7990b0199fafde9bf6da00aa4b87b2 >>e >>14fb0e717/gistfile1.txt >> >>There definitely seems to be an issue there. Unfortunately, I cannot work >>out where exactly the problem lies since the String cast error message is >>truncated. Do you know of any easy way to get the rest of this message? >> >>Thank you for your patience as well. It is very much appreciated! >> >>Jonathan >> >> >>On 31 Jan 2014, at 11:20, Chris Riccomini <[email protected]> >>wrote: >> >>> Hey Jonathan, >>> >>> Could you send the logs for the actual container? The AM isn't actually >>> running your code--it just manages and tells YARN to start a SECOND >>> container to run your code. :) >>> >>> For example, in the AM logs, you'll see: >>> >>> 2014-01-30 18:36:46 SamzaAppMasterTaskManager [INFO] Claimed task ID 0 >>>for >>> container container_1391135766129_0001_01_000002 on node s1 >>> >>>(http://s1:8042/node/containerlogs/container_1391135766129_0001_01_00000 >>>2 >>>). >>> >>> This is the log path to the container that's running your code. >>> >>> Alternatively, you can find the second container by clicking on the >>> ApplicationMaster link in the YARN RM web UI, and then clicking on the >>> link in the "running containers" section that ends with _000002. >>> >>> If you could grab that log and send it here, I can take a look and help >>> you figure out what's going on. >>> >>> Cheers, >>> Chris >>> >>> On 1/30/14 7:23 PM, "Jonathan Poltak Samosir" >>><[email protected]> >>> wrote: >>> >>>> Hi Chris, >>>> >>>> Thanks for your thoughts. >>>> >>>> 1. I will definitely put file reading on another thread, I just wanted >>>>to >>>> get it working for now with a very simple test file before I move on >>>>and >>>> re-implement my code to be more robust. Thanks for the pointers on >>>> checking for space before calling BlockingEnvelopeMap.put(), also. I >>>>will >>>> definitely come back to that afterwards. >>>> >>>> 2. Again, will definitely revisit this point later on as well as the >>>>file >>>> offset tip. >>>> >>>> 3. Here are the logs: >>>> application-master.log: >>>> https://gist.github.com/poltak/8726030 >>>> >>>> gc.log (probably not needed, but it's there): >>>> https://gist.github.com/poltak/8726004 >>>> >>>> stdout: >>>> https://gist.github.com/poltak/8726036 >>>> >>>> stderr was clean. >>>> >>>> Not sure if you will find anything useful out of those, but I'm sure >>>>your >>>> understanding of it will greatly outweigh mine. >>>> >>>> Thanks, >>>> Jonathan >>>> >>>> >>>> On 30 Jan 2014, at 15:47, Chris Riccomini <[email protected]> >>>>wrote: >>>> >>>>> Hey Jonathan, >>>>> >>>>> Another follow-on thought. You're currently using null for the >>>>>offset, >>>>> but >>>>> you could very easily use filepos, and then fseek to the filepos when >>>>>a >>>>> SystemStreamPartition is instantiated. This would give you the >>>>>ability >>>>> to >>>>> "pick up where you left off" rather than re-reading the whole file >>>>>every >>>>> time your SamzaContainer starts. >>>>> >>>>> Cheers, >>>>> Chris >>>>> >>>>> On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]> >>>>>wrote: >>>>> >>>>>> Hey Jonathan, >>>>>> >>>>>> I took a look at your code. Nothing looks horribly wrong at first >>>>>> glance. >>>>>> A couple of thoughts: >>>>>> >>>>>> 1. You're reading the full file and loading it into the >>>>>> BlockingEnvelopeMap in your start() method. This is OK if the file >>>>>>is >>>>>> small, and will ALWAYS be small. If the file is NOT small, you need >>>>>>to >>>>>> move the file reading to another thread, and only >>>>>> BlockingEnvelopeMap.put() when there is space. The start/stop >>>>>>methods >>>>>> would then start and stop your reader thread. The "when there is >>>>>>space" >>>>>> behavior can be implemented in one of two ways: 1) check >>>>>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines, >>>>>> and >>>>>> block if it's above some threshold or 2) override >>>>>> BlockingEnvelopeMap.newBlockingQueue >>>>>> To use a bounded queue, which will automatically force your reader >>>>>> thread >>>>>> to block if the queue is full. >>>>>> 2. An alternative input path style would be to just define the input >>>>>> path >>>>>> as the stream name. For example, >>>>>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the >>>>>> MedicalDataConsumer.register method call >>>>>> systemStreamPartition.getStream >>>>>> to get the file path, and instantiate the file reader there. The >>>>>> advantage >>>>>> to this approach is it means you only need one MedicaDataConsumer to >>>>>> read >>>>>> from N number of files, rather than having one MedicalDataConsumer >>>>>>(and >>>>>> system) per file. >>>>>> 3. Can you send the output log when running your job? >>>>>> >>>>>> >>>>>> Cheers, >>>>>> Chris >>>>>> >>>>>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir" >>>>>> <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the help, Chris! >>>>>>> >>>>>>> The Javadocs you updated certainly did help my understanding of the >>>>>>> SystemConsumer interface, although I still have not been able to >>>>>>>get >>>>>>> my >>>>>>> System to work. >>>>>>> >>>>>>> I have now resorted to trying to extend the BlockingEnvelopeMap >>>>>>> implementation of SystemConsumer, as it seemed a lot simpler for >>>>>>>what >>>>>>> I >>>>>>> want to do plus the hello-samza example uses it so I can use it as >>>>>>>a >>>>>>> reference example. >>>>>>> >>>>>>> Even simply putting a call to 'put()' at the beginning of the >>>>>>> 'start()' >>>>>>> method with a debug message, nothing is received by my simple >>>>>>> StreamTask >>>>>>> (which simply forwards what is received from my System's stream >>>>>>>onto a >>>>>>> Kafka stream for now). As this is the case, I think there is a flaw >>>>>>> in my >>>>>>> understanding of something much more fundamental here relating to >>>>>>> Samza >>>>>>> System... I am sure there is something very simple that I am >>>>>>>missing >>>>>>> with >>>>>>> my implementation, as I have based it directly off the >>>>>>> WikipediaSystem of >>>>>>> which I feel I have a pretty thorough understanding of now. >>>>>>> >>>>>>> If anyone has time to look at my implementations for >>>>>>>SystemConsumer, >>>>>>> SystemFactory and the previously mentioned StreamTask's config file >>>>>>> (they're all very simple and "too-the-point", since I'm just trying >>>>>>>to >>>>>>> get things working), it would be very much appreciated. >>>>>>> >>>>>>> SystemFactory implementation: >>>>>>> >>>>>>> >>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped >>>>>>>i >>>>>>>a/ >>>>>>> sr >>>>>>> c >>>>>>> >>>>>>> >>>>>>>/main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory. >>>>>>>j >>>>>>>av >>>>>>> a >>>>>>> >>>>>>> SystemConsumer implementation: >>>>>>> >>>>>>> >>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikiped >>>>>>>i >>>>>>>a/ >>>>>>> sr >>>>>>> c >>>>>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java >>>>>>> >>>>>>> StreamTask config (which specifies input from my System): >>>>>>> >>>>>>> >>>>>>>https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-pac >>>>>>>k >>>>>>>ag >>>>>>> e/ >>>>>>> s >>>>>>> rc/main/config/medical-data-feed.properties >>>>>>> >>>>>>> And yes, once I get it working and reading a file into a Samza >>>>>>> Stream, I >>>>>>> will be happy to submit a patch. >>>>>>> >>>>>>> Thanks, >>>>>>> Jonathan >>>>>>> >>>>>>> >>>>>>> On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Jonathan, >>>>>>>> >>>>>>>> I've attempted to answer your questions by updating the Javadocs. >>>>>>>>:) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>http://samza.incubator.apache.org/learn/documentation/0.7.0/api/jav >>>>>>>>a >>>>>>>>do >>>>>>>> cs >>>>>>>> / >>>>>>>> or >>>>>>>> g/apache/samza/system/SystemConsumer.html >>>>>>>> >>>>>>>> Let me know if anything doesn't make sense. >>>>>>>> >>>>>>>> Also, it'd be awesome if you could contribute your file reader >>>>>>>> SystemConsumer. This seems like it'd be really useful for a lot of >>>>>>>> things. >>>>>>>> Could you open a JIRA and submit a patch when you're ready? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Chris >>>>>>>> >>>>>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir" >>>>>>>> <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> Currently trying to write a simple Samza System that reads from a >>>>>>>>> file >>>>>>>>> and puts the contents onto a Samza-compatible stream. Been basing >>>>>>>>>it >>>>>>>>> off >>>>>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory >>>>>>>>> implementation seemed to be pretty straightforward (get path to >>>>>>>>>file >>>>>>>>> from >>>>>>>>> config and return a SystemConsumer that reads the file at that >>>>>>>>> path), >>>>>>>>> although I am not 100% sure of what the purpose of all the >>>>>>>>> SystemConsumer >>>>>>>>> interface methods are. >>>>>>>>> >>>>>>>>> start() and stop() seem fairly self-explanatory, getting called >>>>>>>>>at >>>>>>>>> the >>>>>>>>> when the System is started and stopped, respectively (please let >>>>>>>>>me >>>>>>>>> know >>>>>>>>> if I am wrong about any of my understandings). register() seems >>>>>>>>>to >>>>>>>>> be >>>>>>>>> similar to start() in that it will be called near the beginning >>>>>>>>>of >>>>>>>>> the >>>>>>>>> System, although giving access to the SystemStreamPartition and a >>>>>>>>> given >>>>>>>>> partition offset, correct? The poll() method seems to be where >>>>>>>>>most >>>>>>>>> of >>>>>>>>> the action takes place, and going by its name, it is called often >>>>>>>>>or >>>>>>>>> at >>>>>>>>> specified intervals? If so, how does this polling work and how is >>>>>>>>> the >>>>>>>>> interval specified? >>>>>>>>> Also, the List of IncomingMessageEnvelopes that get returned from >>>>>>>>> poll(): >>>>>>>>> these are then forwarded on to their specified >>>>>>>>> SystemStreamPartitions >>>>>>>>> (first arg in the IncomingMessageEnvelope constructor)? >>>>>>>>> >>>>>>>>> Anyway, thanks for your time and let me know how far off I am >>>>>>>>>with >>>>>>>>> my >>>>>>>>> understanding of this (as I can't get anything working with my >>>>>>>>> current >>>>>>>>> system implementation). Will be happy to contribute back Javadoc >>>>>>>>> patch >>>>>>>>> reflecting my amended understanding of this interface afterwards. >>>>>>>>> >>>>>>>>> Jonathan >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
