ok,
I have working prototype already, if somebody is interested(attached)

I might add it as PR latter(with tests etc)

tested locally & with s3







On 21 April 2016 at 12:01, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> as far as I know there is no one working on this. I'm only aware of
> someone working on an ORC (from Hive) Writer.
>
> This would be a welcome addition! I think you are already on the right
> track, the only thing required will probably be an AvroFileWriter and you
> already started looking at SequenceFileWriter, which should be similar.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 09:45 Igor Berman <igor.ber...@gmail.com> wrote:
>
>> Hi All,
>> Is there such implementation somewhere?(before I start to implement it
>> myself, it seems not too difficult based on SequenceFileWriter example)
>>
>> anyway any ideas/pointers will be highly appreciated
>>
>> thanks in advance
>>
>>
package org.apache.flink.streaming.connectors.fs.avro;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Map;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericData;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueRecordWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.JobConf;

/**
 * Implementation of AvroKeyValue writer that can be used in Sink.
 * <p>
 You'll need dependency(pay attention to classifier, it works only for hadoop2)
 <pre>
 {@code
 
 first thing to add avro mapred dependency
	<dependency>
		<groupId>org.apache.avro</groupId>
		<artifactId>avro-mapred</artifactId>
		<version>1.7.6</version>
		<classifier>hadoop2</classifier>
	</dependency>
}
</pre>		
 And then:
  <pre>
 {@code
 RollingSink<Tuple2<AvroKey<Long> , AvroValue<Long>>> sink = new RollingSink<Tuple2<AvroKey<Long> , AvroValue<Long>>>("/tmp/path");
 sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm"));
 Map<String,String> properties = new HashMap<>();
 Schema longSchema = Schema.create(Type.LONG);
 String keySchema = longSchema.toString();
 properties.put("avro.schema.output.key", keySchema);
 String valueSchema = longSchema.toString();
 properties.put("avro.schema.output.value", valueSchema);
 properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
 properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());

 sink.setWriter(new AvroSinkWriter<AvroKey<Long> , AvroValue<Long>>(properties));
 sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
 }
 </pre>
 
 to test with s3:
<pre>
{@code
	create core-site.xml(I haven't other way to test locally)	
<configuration>
	<property>
	  <name>fs.s3.impl</name>
	  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
	</property>
	<property>
	  <name>fs.s3a.access.key</name>
	  <value>xxx</value>
	</property>
	
	<property>
	  <name>fs.s3a.secret.key</name>
	  <value>yyy</value>
	</property>
	
	<property>
		<!-- probably with hdfs installation it won't be needed -->
		<name>fs.s3a.buffer.dir</name>
		<value>/tmp</value>
	</property>
</configuration>


and add following dependencies(not sure what is best option here):
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-aws</artifactId>
			<version>2.7.0</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<artifactId>guava</artifactId>
					<groupId>com.google.guava</groupId>
				</exclusion>
			</exclusions>
		</dependency>

 }
 </pre>
 */
public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
	private static final long serialVersionUID = 1L;

	private transient FSDataOutputStream outputStream;

	private transient AvroKeyValueRecordWriter<K, V> writer;

	private Class<K> keyClass;

	private Class<V> valueClass;

	private final Map<String, String> properties;

	/**
	 * C'tor for the writer
	 * <p>
	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
	 * @param properties
	 */
	public AvroSinkWriter(Map<String, String> properties) {
		this.properties = properties;
	}

	private AvroSinkWriter(Class<K> keyClass, Class<V> valueClass, Map<String, String> properties) {
		this.properties = properties;
		this.keyClass = keyClass;
		this.valueClass = valueClass;
	}

	//this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..)
	private CodecFactory getCompressionCodec(JobConf conf) {
		if (conf.getBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false)) {
			// Default to deflate compression.
			int deflateLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY,
					CodecFactory.DEFAULT_DEFLATE_LEVEL);
			int xzLevel = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY,
					CodecFactory.DEFAULT_XZ_LEVEL);

			String outputCodec = conf.get(AvroJob.CONF_OUTPUT_CODEC);

			if (outputCodec == null) {
				String compressionCodec = conf.get("mapred.output.compression.codec");
				String avroCodecName = HadoopCodecFactory.getAvroCodecName(compressionCodec);
				if (avroCodecName != null) {
					conf.set(AvroJob.CONF_OUTPUT_CODEC, avroCodecName);
					return HadoopCodecFactory.fromHadoopString(compressionCodec);
				} else {
					return CodecFactory.deflateCodec(deflateLevel);
				}
			} else if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
				return CodecFactory.deflateCodec(deflateLevel);
			} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
				return CodecFactory.xzCodec(xzLevel);
			} else {
				return CodecFactory.fromString(outputCodec);
			}
		}
		// No compression.
		return CodecFactory.nullCodec();
	}

	@Override
	public void open(FSDataOutputStream outStream) throws IOException {
		if (outputStream != null) {
			throw new IllegalStateException("AvroSinkWriter has already been opened.");
		}
		if (keyClass == null) {
			throw new IllegalStateException("Key Class has not been initialized.");
		}
		if (valueClass == null) {
			throw new IllegalStateException("Value Class has not been initialized.");
		}

		this.outputStream = outStream;

		JobConf config = new JobConf();
		for (Map.Entry<String, String> e : properties.entrySet()) {
			config.set(e.getKey(), e.getValue());
		}

		//this code is base on AvroKeyValueOutputFormat.getRecordWriter(..)
		CodecFactory compressionCodec = getCompressionCodec(config);

		AvroDatumConverterFactory converterFactory = new AvroDatumConverterFactory(config);

		AvroDatumConverter<K, ?> keyConverter = converterFactory.create(keyClass);
		AvroDatumConverter<V, ?> valueConverter = converterFactory.create(valueClass);

		GenericData dataModel = AvroSerialization.createDataModel(config);

		writer = new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter, dataModel, compressionCodec, outputStream);
	}

	@Override
	public void flush() throws IOException {
		if (writer != null) {
			writer.sync();
		}
	}

	@Override
	public void close() throws IOException {
		if (writer != null) {
			writer.close(null);
		}
		writer = null;
		outputStream = null;
	}

	@Override
	public void write(Tuple2<K, V> element) throws IOException {
		if (outputStream == null) {
			throw new IllegalStateException("SequenceFileWriter has not been opened.");
		}
		writer.write(element.f0, element.f1);
	}

	@Override
	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
		if (!type.isTupleType()) {
			throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
		}

		TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;

		if (tupleType.getArity() != 2) {
			throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
		}

		TypeInformation<K> keyType = tupleType.getTypeAt(0);
		TypeInformation<V> valueType = tupleType.getTypeAt(1);

		this.keyClass = keyType.getTypeClass();
		this.valueClass = valueType.getTypeClass();
	}

	@Override
	public Writer<Tuple2<K, V>> duplicate() {
		return new AvroSinkWriter<K, V>(keyClass, valueClass, properties);
	}
}

Reply via email to