Hi,


I’m using Flink 1.5.6 and Hadoop 2.7.1.



*My requirement is to read hdfs sequence file (SequenceFileInputFormat),
then write it back to hdfs (SequenceFileAsBinaryOutputFormat with
compression).*



Below code won’t work until I copy the flink-hadoop-compatibility jar to
FLINK_HOME/lib. I find a similar discussion
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/hadoopcompatibility-not-in-dist-td12552.html,
do we have any update regarding this, or this is still the only way to get
the hadoop compatibility work?



If this is still the only way, do I need to copy that jar to every node of
the cluster?



Or, for my SUPER simple requirement above, is there any other way to go?
For example, without using  flink-hadoop-compatibility?



import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.FlatMapOperator;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

import org.apache.flink.hadoopcompatibility.HadoopInputs;

import org.apache.flink.util.Collector;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.SequenceFile.CompressionType;

import org.apache.hadoop.mapreduce.Job;

import
org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



import com.twitter.chill.protobuf.ProtobufSerializer;



public class Foobar {



        @SuppressWarnings("serial")

        public static void main(String[] args) throws Exception {

                 ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();


env.getConfig().registerTypeWithKryoSerializer(ProtobufObject.class,
ProtobufSerializer.class);



                 String path = "hdfs://...";

                 DataSource<Tuple2<NullWritable, BytesWritable>> input =
env.createInput(HadoopInputs.readHadoopFile(

                                  new
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<NullWritable,
BytesWritable>(),

                                  NullWritable.class, BytesWritable.class,
path),

                                  new
TupleTypeInfo<>(TypeInformation.of(NullWritable.class),
TypeInformation.of(BytesWritable.class)));



                 FlatMapOperator<Tuple2<NullWritable, BytesWritable>,
Tuple2<BytesWritable, BytesWritable>> x = input.flatMap(

                                  new FlatMapFunction<Tuple2<NullWritable,
BytesWritable>, Tuple2<BytesWritable, BytesWritable>>() {



                                          @Override

                                          public void
flatMap(Tuple2<NullWritable, BytesWritable> value,


Collector<Tuple2<BytesWritable, BytesWritable>> out) throws Exception {

                                                   ProtobufObject info =
ProtobufObject.parseFrom(value.f1.copyBytes());

                                                   String key =
info.getKey();

                                                   out.collect(new
Tuple2<BytesWritable, BytesWritable>(new BytesWritable(key.getBytes()),

                                                                   new
BytesWritable(info.toByteArray())));

                                          }

                                  });



                 Job job = Job.getInstance();

                 HadoopOutputFormat<BytesWritable, BytesWritable> hadoopOF
= new HadoopOutputFormat<BytesWritable, BytesWritable>(

                                  new SequenceFileAsBinaryOutputFormat(),
job);




hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress",
"true");


hadoopOF.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type",
CompressionType.BLOCK.toString());

                 TextOutputFormat.setOutputPath(job, new
Path("hdfs://..."));



                 x.output(hadoopOF);

                 env.execute("foo");

        }

}

Reply via email to