Richard,
I read somewhere that the mappers write out the offset to the output dir
, so that further attempts (after a task failure) can start from the right
offset.
I see that the offset is generated . But where is the logic to read this and
adjust the offset for the next read ? . I wasnt able to find it.
Sam
On Aug 31, 2011, at 12:48 PM, Richard Park wrote:
> It really looks like your mapper tasks may be failing to connect to your
> kafka server.
>
> Here's a brief overview of what that demo job is doing so you can understand
> where the example may have gone wrong.
> DataGenerator:
>
> 1. When DataGenerator is run, it needs the property 'kafka.etl.topic',
> and 'kafka.server.uri' set in the properties file. When you run
> ./run-class.sh
> kafka.etl.impl.DataGenerator test/test.properties, you can tell that
> they're properly set by the output 'topic=<blah>' and 'server uri=<kafka
> server url>.
> 2. The DataGenerator will create a bunch of dummy messages and pump it to
> that kafka server. Afterwards, it will write a file to HDFS at path 'input'
> which you also set in the properties file. The file that is created will be
> named something like 1.dat.
> 3. 1.dat is a sequence file, so if it isn't compressed, you should be
> able to see its contents in plain text. The contents will essentially list
> the kafka server url, the partition number and the topic as well as the
> offset.
> 4. In a real scenario, you'll probably create several of these files for
> each broker and possibly partition, but for this example, you only need one
> file. Each file will spawn a mapper during the mapred step.
>
> CopyJars:
>
> 1. This should copy the necessary jars for kafka hadoop, and push them
> into HDFS for the distributed cache. If the jars are copied locally instead
> of to a remote cluster, most likely HADOOP_CONF_DIR hasn't been set up
> correctly. The environment should probably be set by the script, so someone
> can change that.
>
> SimpleKafkaETLJob
>
> 1. This job will then setup the distributed classpath, and the input path
> should be the directory that 1.dat was written to.
> 2. Internally, the mappers will then load 1.dat and use the connection
> data contained in it to connect to kafka. If it's trying to connect to
> anything but your kafka server, than this file was incorrectly written.
> 3. The RecordReader wraps all of this and hides all the connection stuff
> so that your Mapper should see a stream of Kafka messages rather than the
> contents of 1.dat.
>
> So please see if you can figure out what is wrong with your example and feel
> free beef up the README instructions to take in account your pitfalls.
>
> Thanks,
> -Richard
>
>
>
> On Wed, Aug 31, 2011 at 12:02 PM, Ben Ciceron <[email protected]> wrote:
>
>> ok i could live with setting mapred.job.tracker manually for the code for
>> now.
>> This way it can connect now to the proper jobtracker.
>>
>>> The hadoop map tasks will need to connect to the kafka server port (the
>>> broker uri/port).
>>
>> i run the hadoop soncumer on the same hostA where the kafka-server is
>> running.
>>
>> each of the host in the hadoop cluster can telnet/nmap to port 9092 on
>> hostA where the kafka-server is running.
>> also HostA can connect to port 5181 on any host in the cluster.
>>
>> but each task fails with a similar connection issue :
>> java.io.IOException: java.net.ConnectException: Connection refused
>> at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:155)
>> at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>> at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:210)
>> at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:195)
>> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:393)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:326)
>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1074)
>> at org.apache.hadoop.mapred.Child.main(Child.java:262)
>> Caused by: java.net.ConnectException: Connection refused
>> at sun.nio.ch.Net.connect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
>> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54)
>> at
>> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193)
>> at
>> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156)
>> at
>> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:65)
>> at
>> kafka.etl.KafkaETLContext.getOffsetRange(KafkaETLContext.java:209)
>> at kafka.etl.KafkaETLContext.<init>(KafkaETLContext.java:97)
>> at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>> ... 11 more
>>
Sam William
[email protected]