ok, I have working prototype already, if somebody is interested(attached) I might add it as PR latter(with tests etc)
tested locally & with s3 On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > as far as I know there is no one working on this. I'm only aware of > someone working on an ORC (from Hive) Writer. > > This would be a welcome addition! I think you are already on the right > track, the only thing required will probably be an AvroFileWriter and you > already started looking at SequenceFileWriter, which should be similar. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote: > >> Hi All, >> Is there such implementation somewhere?(before I start to implement it >> myself, it seems not too difficult based on SequenceFileWriter example) >> >> anyway any ideas/pointers will be highly appreciated >> >> thanks in advance >> >>
package org.apache.flink.streaming.connectors.fs.avro; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.io.IOException; import java.util.Map; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.generic.GenericData; import org.apache.avro.hadoop.file.HadoopCodecFactory; import org.apache.avro.hadoop.io.AvroDatumConverter; import org.apache.avro.hadoop.io.AvroDatumConverterFactory; import org.apache.avro.hadoop.io.AvroSerialization; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyValueRecordWriter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.mapred.JobConf; /** * Implementation of AvroKeyValue writer that can be used in Sink. * <p> You'll need dependency(pay attention to classifier, it works only for hadoop2) <pre> {@code first thing to add avro mapred dependency <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.6</version> <classifier>hadoop2</classifier> </dependency> } </pre> And then: <pre> {@code RollingSink<Tuple2<AvroKey<Long> , AvroValue<Long>>> sink = new RollingSink<Tuple2<AvroKey<Long> , AvroValue<Long>>>("/tmp/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); Map<String,String> properties = new HashMap<>(); Schema longSchema = Schema.create(Type.LONG); String keySchema = longSchema.toString(); properties.put("avro.schema.output.key", keySchema); String valueSchema = longSchema.toString(); properties.put("avro.schema.output.value", valueSchema); properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true)); properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName()); sink.setWriter(new AvroSinkWriter<AvroKey<Long> , AvroValue<Long>>(properties)); sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB } </pre> to test with s3: <pre> {@code create core-site.xml(I haven't other way to test locally) <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>fs.s3a.access.key</name> <value>xxx</value> </property> <property> <name>fs.s3a.secret.key</name> <value>yyy</value> </property> <property> <!-- probably with hdfs installation it won't be needed --> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> </configuration> and add following dependencies(not sure what is best option here): <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>2.7.0</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency> } </pre> */ public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { private static final long serialVersionUID = 1L; private transient FSDataOutputStream outputStream; private transient AvroKeyValueRecordWriter<K, V> writer; private Class<K> keyClass; private Class<V> valueClass; private final Map<String, String> properties; /** * C'tor for the writer * <p> * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) * @param properties */ public AvroSinkWriter(Map<String, String> properties) { this.properties = properties; } private AvroSinkWriter(Class<K> keyClass, Class<V> valueClass, Map<String, String> properties) { this.properties = properties; this.keyClass = keyClass; this.valueClass = valueClass; } //this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..) private CodecFactory getCompressionCodec(JobConf conf) { if (conf.getBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false)) { // Default to deflate compression. int deflateLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, CodecFactory.DEFAULT_DEFLATE_LEVEL); int xzLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY, CodecFactory.DEFAULT_XZ_LEVEL); String outputCodec = conf.get(AvroJob.CONF_OUTPUT_CODEC); if (outputCodec == null) { String compressionCodec = conf.get("mapred.output.compression.codec"); String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec); if (avroCodecName != null) { conf.set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName); return HadoopCodecFactory.fromHadoopString(compressionCodec); } else { return CodecFactory.deflateCodec(deflateLevel); } } else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { return CodecFactory.deflateCodec(deflateLevel); } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { return CodecFactory.xzCodec(xzLevel); } else { return CodecFactory.fromString(outputCodec); } } // No compression. return CodecFactory.nullCodec(); } @Override public void open(FSDataOutputStream outStream) throws IOException { if (outputStream != null) { throw new IllegalStateException("AvroSinkWriter has already been opened."); } if (keyClass == null) { throw new IllegalStateException("Key Class has not been initialized."); } if (valueClass == null) { throw new IllegalStateException("Value Class has not been initialized."); } this.outputStream = outStream; JobConf config = new JobConf(); for (Map.Entry<String, String> e : properties.entrySet()) { config.set(e.getKey(), e.getValue()); } //this code is base on AvroKeyValueOutputFormat.getRecordWriter(..) CodecFactory compressionCodec = getCompressionCodec(config); AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(config); AvroDatumConverter<K, ?> keyConverter = converterFactory.create(keyClass); AvroDatumConverter<V, ?> valueConverter = converterFactory.create(valueClass); GenericData dataModel = AvroSerialization.createDataModel(config); writer = new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter, dataModel, compressionCodec, outputStream); } @Override public void flush() throws IOException { if (writer != null) { writer.sync(); } } @Override public void close() throws IOException { if (writer != null) { writer.close(null); } writer = null; outputStream = null; } @Override public void write(Tuple2<K, V> element) throws IOException { if (outputStream == null) { throw new IllegalStateException("SequenceFileWriter has not been opened."); } writer.write(element.f0, element.f1); } @Override public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { if (!type.isTupleType()) { throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); } TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; if (tupleType.getArity() != 2) { throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); } TypeInformation<K> keyType = tupleType.getTypeAt(0); TypeInformation<V> valueType = tupleType.getTypeAt(1); this.keyClass = keyType.getTypeClass(); this.valueClass = valueType.getTypeClass(); } @Override public Writer<Tuple2<K, V>> duplicate() { return new AvroSinkWriter<K, V>(keyClass, valueClass, properties); } }