Are you writing new data into the topic that the HDFS sink is trying to
read data from? This line

[2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)

indicates it's going to wait for about 60s until some data arrives for the
sink connector to write. If nothing arrives, you may perceive this as a
hang because it will wait up to 60s before taking any additional step
(which will be minimal if no more data arrives).

What else do you have going on in this system while the HDFS connector is
running?

-Ewen

On Fri, Oct 28, 2016 at 8:14 AM, Henry Kim <henry....@resonate.com> wrote:

> Hi,
>
>
> I'm was attempting to follow the hdfs-connector quick start guide (
> http://docs.confluent.io/3.0.0/connect/connect-hdfs/docs/
> hdfs_connector.html#quickstart), but I'm unable to consume messages using
> Kafka Connect (hdfs-connector). I did confirm that I am able to consume the
> messages via console.
>
>
> Here is the last final logs I receive from the app.
>
>
> [2016-10-28 10:56:47,288] INFO Hadoop configuration directory
> /etc/hadoop/conf (io.confluent.connect.hdfs.DataWriter:94)
> [2016-10-28 10:56:47,608] WARN Unable to load native-hadoop library for
> your platform... using builtin-java classes where applicable
> (org.apache.hadoop.util.NativeCodeLoader:62)
> [2016-10-28 10:56:48,408] INFO Sink task WorkerSinkTask{id=hdfs-sink-0}
> finished initialization and start (org.apache.kafka.connect.
> runtime.WorkerSinkTask:208)
> [2016-10-28 10:56:48,408] TRACE hdfs-sink-0 polling consumer with timeout
> 58820 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:221)
> [2016-10-28 10:56:56,022] INFO Reflections took 9680 ms to scan 253 urls,
> producing 12411 keys and 81532 values  (org.reflections.Reflections:229)
>
> At this point it hangs. I attempted to trace the code back to the source
> and found that the WorkerSinkTask is stuck here at pollConsumer()
>
>
> log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
> ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
> assert messageBatch.isEmpty() || msgs.isEmpty();
> log.trace("{} polling returned {} messages", id, msgs.count());
>
> Anyone have any ideas?
>
>
> /etc/kafka-connect-hdfs/quickstart-hdfs.properties
> name=hdfs-sink
> connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
> tasks.max=1
> topics=helloworld
> hdfs.url=hdfs://localhost:8020
> flush.size=3
> rotate.interval.ms=500
>
>
>
> /etc/schema-registry/connect-avro-standalone.properties
> bootstrap.servers=localhost:9092
>
> # The converters specify the format of data in Kafka and how to translate
> it into Connect data.
> # Every Connect user will need to configure these based on the format they
> want their data in
> # when loaded from or stored into Kafka
> key.converter=io.confluent.connect.avro.AvroConverter
> key.converter.schema.registry.url=http://0.0.0.0:8081
> value.converter=io.confluent.connect.avro.AvroConverter
> value.converter.schema.registry.url=http://0.0.0.0:8081
>
> # The internal converter used for offsets and config data is configurable
> and must be specified,
> # but most users will always want to use the built-in default. Offset and
> config data is never
> # visible outside of Connect in this format.
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
>
> # Local storage file for offset data
> offset.storage.file.filename=/tmp/connect.offsets
>
>
>
>
> - Henry Kim
>



-- 
Thanks,
Ewen

Reply via email to