Hi,

I'm was attempting to follow the hdfs-connector quick start guide 
(http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/hdfs_connector.html#quickstart),
 but I'm unable to consume messages using Kafka Connect (hdfs-connector). I did 
confirm that I am able to consume the messages via console.


Here is the last final logs I receive from the app.


[2016-10-28 10:56:47,288] INFO Hadoop configuration directory /etc/hadoop/conf 
(io.confluent.connect.hdfs.DataWriter:94)
[2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for your 
platform... using builtin-java classes where applicable 
(org.apache.hadoop.util.NativeCodeLoader:62)
[2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} 
finished initialization and start 
(org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout 58820 
ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
[2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls, 
producing 12411 keys and 81532 values  (org.reflections.Reflections:229)

At this point it hangs. I attempted to trace the code back to the source and 
found that the WorkerSinkTask is stuck here at pollConsumer()


log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());

Anyone have any ideas?


/etc/kafka-connect-hdfs/quickstart-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=helloworld
hdfs.url=hdfs://localhost:8020
flush.size=3
rotate.interval.ms=500



/etc/schema-registry/connect-avro-standalone.properties
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it 
into Connect data.
# Every Connect user will need to configure these based on the format they want 
their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://0.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://0.0.0.0:8081

# The internal converter used for offsets and config data is configurable and 
must be specified,
# but most users will always want to use the built-in default. Offset and 
config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets




- Henry Kim

Reply via email to