I'm using samza-hdfs to write Kafka streams to HDFS, but I can't make it work.
Here is my samza job's properties file: # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=kafka2hdfs # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz # Task task.class=net.tangrui.kafka2hdfs.Kafka2HDFSStreamTask task.inputs=kafka.events # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Systems systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter systems.hdfs.producer.hdfs.base.output.dir=/events # Job Coordinator job.coordinator.system=kafka # Normally, this would be 3, but we have only one broker. job.coordinator.replication.factor=1 Here is my simple task: public class Kafka2HDFSStreamTask implements StreamTask { private final SystemStream OUTPUT_STREAM = new SystemStream("hdfs",* "default"*); @Override public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) throws Exception { String message = (String) incomingMessageEnvelope.getMessage(); OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(OUTPUT_STREAM, message); messageCollector.send(envelope); } } When running this job, a sequence file will be created in HDFS, but only has some header info, no content. I cannot figure out where is wrong. And what should I provide with the "stream" parameter when building the SystemStream instance.