Updated Branches: refs/heads/flume-1.4 f00492f21 -> 8fa29e3ee
FLUME-1865. Rename the Sequence File formatters to Serializer to be consistent with the rest of Flume. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8fa29e3e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8fa29e3e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8fa29e3e Branch: refs/heads/flume-1.4 Commit: 8fa29e3ee0969041b2b3f668d80fbe305729968d Parents: f00492f Author: Mike Percy <[email protected]> Authored: Wed Jan 23 19:43:14 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Wed Jan 23 19:44:31 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/HDFSSequenceFile.java | 22 ++-- .../apache/flume/sink/hdfs/HDFSTextFormatter.java | 79 ------------- .../apache/flume/sink/hdfs/HDFSTextSerializer.java | 79 +++++++++++++ .../flume/sink/hdfs/HDFSWritableFormatter.java | 77 ------------ .../flume/sink/hdfs/HDFSWritableSerializer.java | 77 ++++++++++++ .../apache/flume/sink/hdfs/SeqFileFormatter.java | 68 ----------- .../flume/sink/hdfs/SeqFileFormatterFactory.java | 89 -------------- .../flume/sink/hdfs/SeqFileFormatterType.java | 37 ------ .../flume/sink/hdfs/SequenceFileSerializer.java | 68 +++++++++++ .../sink/hdfs/SequenceFileSerializerFactory.java | 90 +++++++++++++++ .../sink/hdfs/SequenceFileSerializerType.java | 38 ++++++ .../apache/flume/sink/hdfs/MyCustomFormatter.java | 58 --------- .../apache/flume/sink/hdfs/MyCustomSerializer.java | 58 +++++++++ .../apache/flume/sink/hdfs/TestBucketWriter.java | 6 +- .../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java | 6 +- .../sink/hdfs/TestSeqFileFormatterFactory.java | 58 --------- .../hdfs/TestSequenceFileSerializerFactory.java | 59 ++++++++++ 17 files changed, 487 insertions(+), 482 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index e127f6a..3bd25f4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -39,7 +39,7 @@ public class HDFSSequenceFile implements HDFSWriter { private SequenceFile.Writer writer; private String writeFormat; private Context serializerContext; - private SeqFileFormatter formatter; + private SequenceFileSerializer serializer; private boolean useRawLocalFileSystem; public HDFSSequenceFile() { @@ -48,14 +48,15 @@ public class HDFSSequenceFile implements HDFSWriter { @Override public void configure(Context context) { - // use binary writable format by default - writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name()); + // use binary writable serialize by default + writeFormat = context.getString("hdfs.writeFormat", + SequenceFileSerializerType.Writable.name()); useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem", false); serializerContext = new Context( - context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX)); - formatter = SeqFileFormatterFactory - .getFormatter(writeFormat, serializerContext); + context.getSubProperties(SequenceFileSerializerFactory.CTX_PREFIX)); + serializer = SequenceFileSerializerFactory + .getSerializer(writeFormat, serializerContext); logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = " + useRawLocalFileSystem); } @@ -82,17 +83,18 @@ public class HDFSSequenceFile implements HDFSWriter { if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { FSDataOutputStream outStream = hdfs.append(dstPath); - writer = SequenceFile.createWriter(conf, outStream, formatter.getKeyClass(), - formatter.getValueClass(), compType, codeC); + writer = SequenceFile.createWriter(conf, outStream, serializer + .getKeyClass(), + serializer.getValueClass(), compType, codeC); } else { writer = SequenceFile.createWriter(hdfs, conf, dstPath, - formatter.getKeyClass(), formatter.getValueClass(), compType, codeC); + serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); } } @Override public void append(Event e) throws IOException { - for (SeqFileFormatter.Record record : formatter.format(e)) { + for (SequenceFileSerializer.Record record : serializer.serialize(e)) { writer.append(record.getKey(), record.getValue()); } } http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java deleted file mode 100644 index 4b39f5d..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hdfs; - -import java.util.Collections; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.LongWritable; - -public class HDFSTextFormatter implements SeqFileFormatter { - - private Text makeText(Event e) { - Text textObject = new Text(); - textObject.set(e.getBody(), 0, e.getBody().length); - return textObject; - } - - @Override - public Class<LongWritable> getKeyClass() { - return LongWritable.class; - } - - @Override - public Class<Text> getValueClass() { - return Text.class; - } - - @Override - public Iterable<Record> format(Event e) { - Object key = getKey(e); - Object value = getValue(e); - return Collections.singletonList(new Record(key, value)); - } - - private Object getKey(Event e) { - // Write the data to HDFS - String timestamp = e.getHeaders().get("timestamp"); - long eventStamp; - - if (timestamp == null) { - eventStamp = System.currentTimeMillis(); - } else { - eventStamp = Long.valueOf(timestamp); - } - return new LongWritable(eventStamp); - } - - private Object getValue(Event e) { - return makeText(e); - } - - public static class Builder implements SeqFileFormatter.Builder { - - @Override - public SeqFileFormatter build(Context context) { - return new HDFSTextFormatter(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java new file mode 100644 index 0000000..32fd206 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hdfs; + +import java.util.Collections; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.LongWritable; + +public class HDFSTextSerializer implements SequenceFileSerializer { + + private Text makeText(Event e) { + Text textObject = new Text(); + textObject.set(e.getBody(), 0, e.getBody().length); + return textObject; + } + + @Override + public Class<LongWritable> getKeyClass() { + return LongWritable.class; + } + + @Override + public Class<Text> getValueClass() { + return Text.class; + } + + @Override + public Iterable<Record> serialize(Event e) { + Object key = getKey(e); + Object value = getValue(e); + return Collections.singletonList(new Record(key, value)); + } + + private Object getKey(Event e) { + // Write the data to HDFS + String timestamp = e.getHeaders().get("timestamp"); + long eventStamp; + + if (timestamp == null) { + eventStamp = System.currentTimeMillis(); + } else { + eventStamp = Long.valueOf(timestamp); + } + return new LongWritable(eventStamp); + } + + private Object getValue(Event e) { + return makeText(e); + } + + public static class Builder implements SequenceFileSerializer.Builder { + + @Override + public SequenceFileSerializer build(Context context) { + return new HDFSTextSerializer(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java deleted file mode 100644 index cece506..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.sink.hdfs; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; - -import java.util.Collections; - -public class HDFSWritableFormatter implements SeqFileFormatter { - - private BytesWritable makeByteWritable(Event e) { - BytesWritable bytesObject = new BytesWritable(); - bytesObject.set(e.getBody(), 0, e.getBody().length); - return bytesObject; - } - - @Override - public Class<LongWritable> getKeyClass() { - return LongWritable.class; - } - - @Override - public Class<BytesWritable> getValueClass() { - return BytesWritable.class; - } - - @Override - public Iterable<Record> format(Event e) { - Object key = getKey(e); - Object value = getValue(e); - return Collections.singletonList(new Record(key, value)); - } - - private Object getKey(Event e) { - String timestamp = e.getHeaders().get("timestamp"); - long eventStamp; - - if (timestamp == null) { - eventStamp = System.currentTimeMillis(); - } else { - eventStamp = Long.valueOf(timestamp); - } - return new LongWritable(eventStamp); - } - - private Object getValue(Event e) { - return makeByteWritable(e); - } - - public static class Builder implements SeqFileFormatter.Builder { - - @Override - public SeqFileFormatter build(Context context) { - return new HDFSWritableFormatter(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java new file mode 100644 index 0000000..b25a6ea --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.sink.hdfs; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; + +import java.util.Collections; + +public class HDFSWritableSerializer implements SequenceFileSerializer { + + private BytesWritable makeByteWritable(Event e) { + BytesWritable bytesObject = new BytesWritable(); + bytesObject.set(e.getBody(), 0, e.getBody().length); + return bytesObject; + } + + @Override + public Class<LongWritable> getKeyClass() { + return LongWritable.class; + } + + @Override + public Class<BytesWritable> getValueClass() { + return BytesWritable.class; + } + + @Override + public Iterable<Record> serialize(Event e) { + Object key = getKey(e); + Object value = getValue(e); + return Collections.singletonList(new Record(key, value)); + } + + private Object getKey(Event e) { + String timestamp = e.getHeaders().get("timestamp"); + long eventStamp; + + if (timestamp == null) { + eventStamp = System.currentTimeMillis(); + } else { + eventStamp = Long.valueOf(timestamp); + } + return new LongWritable(eventStamp); + } + + private Object getValue(Event e) { + return makeByteWritable(e); + } + + public static class Builder implements SequenceFileSerializer.Builder { + + @Override + public SequenceFileSerializer build(Context context) { + return new HDFSWritableSerializer(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java deleted file mode 100644 index c25931c..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.sink.hdfs; - -import org.apache.flume.Context; -import org.apache.flume.Event; - -public interface SeqFileFormatter { - - Class<?> getKeyClass(); - - Class<?> getValueClass(); - - /** - * Format the given event into zero, one or more SequenceFile records - * - * @param e - * event - * @return a list of records corresponding to the given event - */ - Iterable<Record> format(Event e); - - /** - * Knows how to construct this output formatter.<br/> - * <b>Note: Implementations MUST provide a public a no-arg constructor.</b> - */ - public interface Builder { - public SeqFileFormatter build(Context context); - } - - /** - * A key-value pair making up a record in an HDFS SequenceFile - */ - public static class Record { - private final Object key; - private final Object value; - - public Record(Object key, Object value) { - this.key = key; - this.value = value; - } - - public Object getKey() { - return key; - } - - public Object getValue() { - return value; - } - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java deleted file mode 100644 index 20409ba..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hdfs; - -import com.google.common.base.Preconditions; -import org.apache.flume.Context; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SeqFileFormatterFactory { - - private static final Logger logger = - LoggerFactory.getLogger(SeqFileFormatterFactory.class); - - /** - * {@link Context} prefix - */ - static final String CTX_PREFIX = "writeFormat."; - - @SuppressWarnings("unchecked") - static SeqFileFormatter getFormatter(String formatType, Context context) { - - Preconditions.checkNotNull(formatType, - "format type must not be null"); - - // try to find builder class in enum of known formatters - SeqFileFormatterType type; - try { - type = SeqFileFormatterType.valueOf(formatType); - } catch (IllegalArgumentException e) { - logger.debug("Not in enum, loading builder class: {}", formatType); - type = SeqFileFormatterType.Other; - } - Class<? extends SeqFileFormatter.Builder> builderClass = - type.getBuilderClass(); - - // handle the case where they have specified their own builder in the config - if (builderClass == null) { - try { - Class c = Class.forName(formatType); - if (c != null && SeqFileFormatter.Builder.class.isAssignableFrom(c)) { - builderClass = (Class<? extends SeqFileFormatter.Builder>) c; - } else { - logger.error("Unable to instantiate Builder from {}", formatType); - return null; - } - } catch (ClassNotFoundException ex) { - logger.error("Class not found: " + formatType, ex); - return null; - } catch (ClassCastException ex) { - logger.error("Class does not extend " + - SeqFileFormatter.Builder.class.getCanonicalName() + ": " + - formatType, ex); - return null; - } - } - - // build the builder - SeqFileFormatter.Builder builder; - try { - builder = builderClass.newInstance(); - } catch (InstantiationException ex) { - logger.error("Cannot instantiate builder: " + formatType, ex); - return null; - } catch (IllegalAccessException ex) { - logger.error("Cannot instantiate builder: " + formatType, ex); - return null; - } - - return builder.build(context); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java deleted file mode 100644 index ff3eb84..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hdfs; - -public enum SeqFileFormatterType { - Writable(HDFSWritableFormatter.Builder.class), - Text(HDFSTextFormatter.Builder.class), - Other(null); - - private final Class<? extends SeqFileFormatter.Builder> builderClass; - - SeqFileFormatterType(Class<? extends SeqFileFormatter.Builder> builderClass) { - this.builderClass = builderClass; - } - - public Class<? extends SeqFileFormatter.Builder> getBuilderClass() { - return builderClass; - } - -} - http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java new file mode 100644 index 0000000..ec2b760 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.hdfs; + +import org.apache.flume.Context; +import org.apache.flume.Event; + +public interface SequenceFileSerializer { + + Class<?> getKeyClass(); + + Class<?> getValueClass(); + + /** + * Format the given event into zero, one or more SequenceFile records + * + * @param e + * event + * @return a list of records corresponding to the given event + */ + Iterable<Record> serialize(Event e); + + /** + * Knows how to construct this output formatter.<br/> + * <b>Note: Implementations MUST provide a public a no-arg constructor.</b> + */ + public interface Builder { + public SequenceFileSerializer build(Context context); + } + + /** + * A key-value pair making up a record in an HDFS SequenceFile + */ + public static class Record { + private final Object key; + private final Object value; + + public Record(Object key, Object value) { + this.key = key; + this.value = value; + } + + public Object getKey() { + return key; + } + + public Object getValue() { + return value; + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java new file mode 100644 index 0000000..5678836 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.flume.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SequenceFileSerializerFactory { + + private static final Logger logger = + LoggerFactory.getLogger(SequenceFileSerializerFactory.class); + + /** + * {@link Context} prefix + */ + static final String CTX_PREFIX = "writeFormat."; + + @SuppressWarnings("unchecked") + static SequenceFileSerializer getSerializer(String formatType, + Context context) { + + Preconditions.checkNotNull(formatType, + "serialize type must not be null"); + + // try to find builder class in enum of known formatters + SequenceFileSerializerType type; + try { + type = SequenceFileSerializerType.valueOf(formatType); + } catch (IllegalArgumentException e) { + logger.debug("Not in enum, loading builder class: {}", formatType); + type = SequenceFileSerializerType.Other; + } + Class<? extends SequenceFileSerializer.Builder> builderClass = + type.getBuilderClass(); + + // handle the case where they have specified their own builder in the config + if (builderClass == null) { + try { + Class c = Class.forName(formatType); + if (c != null && SequenceFileSerializer.Builder.class.isAssignableFrom(c)) { + builderClass = (Class<? extends SequenceFileSerializer.Builder>) c; + } else { + logger.error("Unable to instantiate Builder from {}", formatType); + return null; + } + } catch (ClassNotFoundException ex) { + logger.error("Class not found: " + formatType, ex); + return null; + } catch (ClassCastException ex) { + logger.error("Class does not extend " + + SequenceFileSerializer.Builder.class.getCanonicalName() + ": " + + formatType, ex); + return null; + } + } + + // build the builder + SequenceFileSerializer.Builder builder; + try { + builder = builderClass.newInstance(); + } catch (InstantiationException ex) { + logger.error("Cannot instantiate builder: " + formatType, ex); + return null; + } catch (IllegalAccessException ex) { + logger.error("Cannot instantiate builder: " + formatType, ex); + return null; + } + + return builder.build(context); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java new file mode 100644 index 0000000..4351488 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +public enum SequenceFileSerializerType { + Writable(HDFSWritableSerializer.Builder.class), + Text(HDFSTextSerializer.Builder.class), + Other(null); + + private final Class<? extends SequenceFileSerializer.Builder> builderClass; + + SequenceFileSerializerType( + Class<? extends SequenceFileSerializer.Builder> builderClass) { + this.builderClass = builderClass; + } + + public Class<? extends SequenceFileSerializer.Builder> getBuilderClass() { + return builderClass; + } + +} + http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java deleted file mode 100644 index ab1e463..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hdfs; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; - -import java.util.Arrays; - -public class MyCustomFormatter implements SeqFileFormatter { - - @Override - public Class<LongWritable> getKeyClass() { - return LongWritable.class; - } - - @Override - public Class<BytesWritable> getValueClass() { - return BytesWritable.class; - } - - @Override - public Iterable<Record> format(Event e) { - return Arrays.asList( - new Record(new LongWritable(1234L), new BytesWritable(new byte[10])), - new Record(new LongWritable(4567L), new BytesWritable(new byte[20])) - ); - } - - public static class Builder implements SeqFileFormatter.Builder { - - @Override - public SeqFileFormatter build(Context context) { - return new MyCustomFormatter(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java new file mode 100644 index 0000000..72164fd --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hdfs; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; + +import java.util.Arrays; + +public class MyCustomSerializer implements SequenceFileSerializer { + + @Override + public Class<LongWritable> getKeyClass() { + return LongWritable.class; + } + + @Override + public Class<BytesWritable> getValueClass() { + return BytesWritable.class; + } + + @Override + public Iterable<Record> serialize(Event e) { + return Arrays.asList( + new Record(new LongWritable(1234L), new BytesWritable(new byte[10])), + new Record(new LongWritable(4567L), new BytesWritable(new byte[20])) + ); + } + + public static class Builder implements SequenceFileSerializer.Builder { + + @Override + public SequenceFileSerializer build(Context context) { + return new MyCustomSerializer(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 829d7e8..ebe277c 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -195,7 +195,7 @@ public class TestBucketWriter { open = true; } }; - HDFSTextFormatter formatter = new HDFSTextFormatter(); + HDFSTextSerializer serializer = new HDFSTextSerializer(); File tmpFile = File.createTempFile("flume", "test"); tmpFile.deleteOnExit(); String path = tmpFile.getParent(); @@ -280,7 +280,7 @@ public class TestBucketWriter { final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - HDFSTextFormatter formatter = new HDFSTextFormatter(); + HDFSTextSerializer formatter = new HDFSTextSerializer(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, @@ -299,7 +299,7 @@ public class TestBucketWriter { final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - HDFSTextFormatter formatter = new HDFSTextFormatter(); + HDFSTextSerializer serializer = new HDFSTextSerializer(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java index bcd19e9..2e71069 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -153,7 +153,7 @@ public class TestHDFSEventSinkOnMiniCluster { } /** - * Writes two events in GZIP-compressed format. + * Writes two events in GZIP-compressed serialize. */ @Test public void simpleHDFSGZipCompressedTest() throws EventDeliveryException, IOException { http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java deleted file mode 100644 index 9d17785..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hdfs; - -import org.apache.flume.Context; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -public class TestSeqFileFormatterFactory { - - @Test - public void getTextFormatter() { - SeqFileFormatter formatter = - SeqFileFormatterFactory.getFormatter("Text", new Context()); - - assertTrue(formatter != null); - assertTrue(formatter.getClass().getName(), - formatter instanceof HDFSTextFormatter); - } - - @Test - public void getWritableFormatter() { - SeqFileFormatter formatter = - SeqFileFormatterFactory.getFormatter("Writable", new Context()); - - assertTrue(formatter != null); - assertTrue(formatter.getClass().getName(), - formatter instanceof HDFSWritableFormatter); - } - - @Test - public void getCustomFormatter() { - SeqFileFormatter formatter = SeqFileFormatterFactory.getFormatter( - "org.apache.flume.sink.hdfs.MyCustomFormatter$Builder", new Context()); - - assertTrue(formatter != null); - assertTrue(formatter.getClass().getName(), - formatter instanceof MyCustomFormatter); - } - -} http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java new file mode 100644 index 0000000..6381edc --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +import org.apache.flume.Context; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestSequenceFileSerializerFactory { + + @Test + public void getTextFormatter() { + SequenceFileSerializer formatter = + SequenceFileSerializerFactory.getSerializer("Text", new Context()); + + assertTrue(formatter != null); + assertTrue(formatter.getClass().getName(), + formatter instanceof HDFSTextSerializer); + } + + @Test + public void getWritableFormatter() { + SequenceFileSerializer formatter = + SequenceFileSerializerFactory.getSerializer("Writable", new Context()); + + assertTrue(formatter != null); + assertTrue(formatter.getClass().getName(), + formatter instanceof HDFSWritableSerializer); + } + + @Test + public void getCustomFormatter() { + SequenceFileSerializer formatter = SequenceFileSerializerFactory + .getSerializer( + "org.apache.flume.sink.hdfs.MyCustomSerializer$Builder", new Context()); + + assertTrue(formatter != null); + assertTrue(formatter.getClass().getName(), + formatter instanceof MyCustomSerializer); + } + +}
