I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it
work.

Here is my samza job's properties file:

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=kafka2hdfs

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask
task.inputs=kafka.events

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092

systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
systems.hdfs.producer.hdfs.base.output.dir=/events

# Job Coordinator
job.coordinator.system=kafka
# Normally, this would be 3, but we have only one broker.
job.coordinator.replication.factor=1


Here is my simple task:

public class Kafka2HDFSStreamTask implements StreamTask {
  private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",*
"default"*);

  @Override
  public void process(IncomingMessageEnvelope incomingMessageEnvelope,
                      MessageCollector messageCollector, TaskCoordinator
taskCoordinator) throws Exception {
    String message = (String) incomingMessageEnvelope.getMessage();
    OutgoingMessageEnvelope envelope = new
OutgoingMessageEnvelope(OUTPUT_STREAM, message);
    messageCollector.send(envelope);
  }
}

When running this job, a sequence file will be created in HDFS, but only
has some header info, no content. I cannot figure out where is wrong. And
what should I provide with the "stream" parameter when building the
SystemStream instance.

Reply via email to