Neha,
I followed the procedure described in
https://github.com/kafka-dev/kafka/tree/kafka-v0.6/contrib/hadoop-consumer . I
could nt get the distributed cache working . So, for the time being I went
ahead with copying the jars to the tasknodes to get it to run.
Im not sure , if this particular portion of the examples is completely tested
or if I have a wrong version of the file ..
For example .
The file KafkaETLContext.java
public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException
{
if ( !hasMore() ) return false;
boolean gotNext = get(key, value);
Iterator<ByteBufferMessageSet> iter = _response.iterator();
while ( !gotNext && _response != null && iter.hasNext()) {
ByteBufferMessageSet msgSet = iter.next();
if ( hasError(msgSet)) return false;
_messageIt = (Iterator<Message>) msgSet.iterator();
gotNext = get(key, value);
}
return gotNext;
}
The null check for _response is comes after Iterator<ByteBufferMessageSet>
iter = _response.iterator() and the first time this function is called , the
_response object is null.
Another question ,
If i just generate say 2000 events . and set kafka.fetch.limit = 3000 or
(-1) , Id expect the the job to stop after reading the 2000 events, but it
seems to keep looping from the beginning. Am I missing something here ?
Sam
On Aug 9, 2011, at 4:29 PM, Neha Narkhede wrote:
> Sam,
>
> I tried this on a downloaded copy of kafka
> v0.6<http://sna-projects.com/kafka/downloads/kafka-0.6.zip>-
>
> nnarkhed-md:kafka nnarkhed$ jar tvf kafka-0.6.jar | grep
> "kafka.javaapi.consumer.SimpleConsumer"
> 3501 Tue May 24 10:23:24 PDT 2011
> kafka/javaapi/consumer/SimpleConsumer.class
>
> I suspect that the kafka-0.6.jar is not registered correctly with the
> DistributedCache.
>
> Thanks,
> Neha
>
> On Tue, Aug 9, 2011 at 3:32 PM, Sam William <[email protected]> wrote:
>
>> Im trying to run this sample hadoop consumer in the 0.6 version . I see
>> that the jar files (including kafka-0.6.jar) are proper when being copied to
>> the DistributedCache , but I get the exception
>>
>> Error: java.lang.ClassNotFoundException:
>> kafka.javaapi.consumer.SimpleConsumer
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
>> at kafka.etl.KafkaETLContext.(KafkaETLContext.java:93)
>> at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>> at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>> at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)
>> at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)
>> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
>> at org.apache.hadoop.mapred.Child.main(Child.java:170)
>>
>>
>>
>> Is there something Im missing here ?
>>
>>
>> Sam William
>> [email protected]
>>
>>
>>
>>
Sam William
[email protected]