Hi Chris,

Thanks for your thoughts. 

1. I will definitely put file reading on another thread, I just wanted to get 
it working for now with a very simple test file before I move on and 
re-implement my code to be more robust. Thanks for the pointers on checking for 
space before calling BlockingEnvelopeMap.put(), also. I will definitely come 
back to that afterwards.

2. Again, will definitely revisit this point later on as well as the file 
offset tip.

3. Here are the logs:
application-master.log:
https://gist.github.com/poltak/8726030

gc.log (probably not needed, but it's there):
https://gist.github.com/poltak/8726004

stdout:
https://gist.github.com/poltak/8726036

stderr was clean.

Not sure if you will find anything useful out of those, but I'm sure your 
understanding of it will greatly outweigh mine.

Thanks,
Jonathan


On 30 Jan 2014, at 15:47, Chris Riccomini <[email protected]> wrote:

> Hey Jonathan,
> 
> Another follow-on thought. You're currently using null for the offset, but
> you could very easily use filepos, and then fseek to the filepos when a
> SystemStreamPartition is instantiated. This would give you the ability to
> "pick up where you left off" rather than re-reading the whole file every
> time your SamzaContainer starts.
> 
> Cheers,
> Chris
> 
> On 1/30/14 3:43 PM, "Chris Riccomini" <[email protected]> wrote:
> 
>> Hey Jonathan,
>> 
>> I took a look at your code. Nothing looks horribly wrong at first glance.
>> A couple of thoughts:
>> 
>> 1. You're reading the full file and loading it into the
>> BlockingEnvelopeMap in your start() method. This is OK if the file is
>> small, and will ALWAYS be small. If the file is NOT small, you need to
>> move the file reading to another thread, and only
>> BlockingEnvelopeMap.put() when there is space. The start/stop methods
>> would then start and stop your reader thread. The "when there is space"
>> behavior can be implemented in one of two ways: 1) check
>> BlockingEnvelopeMap.getNumMessagesInQueue before reading more lines, and
>> block if it's above some threshold or 2) override
>> BlockingEnvelopeMap.newBlockingQueue
>> To use a bounded queue, which will automatically force your reader thread
>> to block if the queue is full.
>> 2. An alternative input path style would be to just define the input path
>> as the stream name. For example,
>> task.inputs=medicaldata./N/u/poltak/test.csv. You can then have the
>> MedicalDataConsumer.register method call systemStreamPartition.getStream
>> to get the file path, and instantiate the file reader there. The advantage
>> to this approach is it means you only need one MedicaDataConsumer to read
>> from N number of files, rather than having one MedicalDataConsumer (and
>> system) per file.
>> 3. Can you send the output log when running your job?
>> 
>> 
>> Cheers,
>> Chris
>> 
>> On 1/30/14 2:56 PM, "Jonathan Poltak Samosir" <[email protected]>
>> wrote:
>> 
>>> Thanks for the help, Chris!
>>> 
>>> The Javadocs you updated certainly did help my understanding of the
>>> SystemConsumer interface, although I still have not been able to get my
>>> System to work.
>>> 
>>> I have now resorted to trying to extend the BlockingEnvelopeMap
>>> implementation of SystemConsumer, as it seemed a lot simpler for what I
>>> want to do plus the hello-samza example uses it so I can use it as a
>>> reference example.
>>> 
>>> Even simply putting a call to 'put()' at the beginning of the 'start()'
>>> method with a debug message, nothing is received by my simple StreamTask
>>> (which simply forwards what is received from my System's stream onto a
>>> Kafka stream for now). As this is the case, I think there is a flaw in my
>>> understanding of something much more fundamental here relating to Samza
>>> System... I am sure there is something very simple that I am missing with
>>> my implementation, as I have based it directly off the WikipediaSystem of
>>> which I feel I have a pretty thorough understanding of now.
>>> 
>>> If anyone has time to look at my implementations for SystemConsumer,
>>> SystemFactory and the previously mentioned StreamTask's config file
>>> (they're all very simple and "too-the-point", since I'm just trying to
>>> get things working), it would be very much appreciated.
>>> 
>>> SystemFactory implementation:
>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr
>>> c
>>> /main/java/samza/examples/wikipedia/system/MedicalDataSystemFactory.java
>>> 
>>> SystemConsumer implementation:
>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-wikipedia/sr
>>> c
>>> /main/java/samza/examples/wikipedia/system/MedicalDataConsumer.java
>>> 
>>> StreamTask config (which specifies input from my System):
>>> https://github.com/poltak/hello-samza/blob/no-freenode/samza-job-package/
>>> s
>>> rc/main/config/medical-data-feed.properties
>>> 
>>> And yes, once I get it working and reading a file into a Samza Stream, I
>>> will be happy to submit a patch.
>>> 
>>> Thanks,
>>> Jonathan
>>> 
>>> 
>>> On 30 Jan 2014, at 10:26, Chris Riccomini <[email protected]>
>>> wrote:
>>> 
>>>> Hey Jonathan,
>>>> 
>>>> I've attempted to answer your questions by updating the Javadocs. :)
>>>> 
>>>> 
>>>> 
>>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs
>>>> /
>>>> or
>>>> g/apache/samza/system/SystemConsumer.html
>>>> 
>>>> Let me know if anything doesn't make sense.
>>>> 
>>>> Also, it'd be awesome if you could contribute your file reader
>>>> SystemConsumer. This seems like it'd be really useful for a lot of
>>>> things.
>>>> Could you open a JIRA and submit a patch when you're ready?
>>>> 
>>>> Cheers,
>>>> Chris
>>>> 
>>>> On 1/29/14 11:14 PM, "Jonathan Poltak Samosir"
>>>> <[email protected]> wrote:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> Currently trying to write a simple Samza System that reads from a file
>>>>> and puts the contents onto a Samza-compatible stream. Been basing it
>>>>> off
>>>>> the hello-samza WikipediaSystem example so far. The SystemFactory
>>>>> implementation seemed to be pretty straightforward (get path to file
>>>>> from
>>>>> config and return a SystemConsumer that reads the file at that path),
>>>>> although I am not 100% sure of what the purpose of all the
>>>>> SystemConsumer
>>>>> interface methods are.
>>>>> 
>>>>> start() and stop() seem fairly self-explanatory, getting called at the
>>>>> when the System is started and stopped, respectively (please let me
>>>>> know
>>>>> if I am wrong about any of my understandings). register() seems to be
>>>>> similar to start() in that it will be called near the beginning of the
>>>>> System, although giving access to the SystemStreamPartition and a
>>>>> given
>>>>> partition offset, correct? The poll() method seems to be where most of
>>>>> the action takes place, and going by its name, it is called often or
>>>>> at
>>>>> specified intervals? If so, how does this polling work and how is the
>>>>> interval specified?
>>>>> Also, the List of IncomingMessageEnvelopes that get returned from
>>>>> poll():
>>>>> these are then forwarded on to their specified SystemStreamPartitions
>>>>> (first arg in the IncomingMessageEnvelope constructor)?
>>>>> 
>>>>> Anyway, thanks for your time and let me know how far off I am with my
>>>>> understanding of this (as I can't get anything working with my current
>>>>> system implementation). Will be happy to contribute back Javadoc patch
>>>>> reflecting my amended understanding of this interface afterwards.
>>>>> 
>>>>> Jonathan
>>>> 
>>> 
>> 
> 

Reply via email to