Updated Branches: refs/heads/trunk e60de9322 -> ad68350ff
FLUME-2048. Add an Avro container file deserializer for Spool Directory Source. (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ad68350f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ad68350f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ad68350f Branch: refs/heads/trunk Commit: ad68350ffcc9a6cb47b7b555b9ebf6dacfa7c129 Parents: e60de93 Author: Hari Shreedharan <[email protected]> Authored: Wed Jun 19 13:33:44 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Jun 19 13:33:44 2013 -0700 ---------------------------------------------------------------------- flume-ng-core/pom.xml | 5 + .../serialization/AvroEventDeserializer.java | 220 +++++++++++++++++++ .../serialization/EventDeserializerType.java | 1 + .../flume/serialization/LengthMeasurable.java | 27 +++ .../flume/serialization/RemoteMarkable.java | 38 ++++ .../ResettableFileInputStream.java | 27 ++- .../TestAvroEventDeserializer.java | 177 +++++++++++++++ .../serialization/TransientPositionTracker.java | 50 +++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 27 +++ pom.xml | 6 + 10 files changed, 577 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index e37de5b..887a65d 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -279,6 +279,11 @@ limitations under the License. </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java new file mode 100644 index 0000000..e44978f --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.commons.codec.binary.Hex; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.event.EventBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +/** + * A deserializer that parses Avro container files, generating one Flume event + * per record in the Avro file, and storing binary avro-encoded records in + * the Flume event body. + */ +public class AvroEventDeserializer implements EventDeserializer { + + private static final Logger logger = LoggerFactory.getLogger + (AvroEventDeserializer.class); + + private final AvroSchemaType schemaType; + private final ResettableInputStream ris; + + private Schema schema; + private byte[] schemaHash; + private String schemaHashString; + private DataFileReader<GenericRecord> fileReader; + private GenericDatumWriter datumWriter; + private GenericRecord record; + private ByteArrayOutputStream out; + private BinaryEncoder encoder; + + @VisibleForTesting + public static enum AvroSchemaType { + HASH, + LITERAL; + } + + public static final String CONFIG_SCHEMA_TYPE_KEY = "schemaType"; + public static final String AVRO_SCHEMA_HEADER_HASH + = "flume.avro.schema.hash"; + public static final String AVRO_SCHEMA_HEADER_LITERAL + = "flume.avro.schema.literal"; + + private AvroEventDeserializer(Context context, ResettableInputStream ris) { + this.ris = ris; + + schemaType = AvroSchemaType.valueOf( + context.getString(CONFIG_SCHEMA_TYPE_KEY, + AvroSchemaType.HASH.toString()).toUpperCase()); + if (schemaType == AvroSchemaType.LITERAL) { + logger.warn(CONFIG_SCHEMA_TYPE_KEY + " set to " + + AvroSchemaType.LITERAL.toString() + ", so storing full Avro " + + "schema in the header of each event, which may be inefficient. " + + "Consider using the hash of the schema " + + "instead of the literal schema."); + } + } + + private void initialize() throws IOException, NoSuchAlgorithmException { + SeekableResettableInputBridge in = new SeekableResettableInputBridge(ris); + long pos = in.tell(); + in.seek(0L); + fileReader = new DataFileReader<GenericRecord>(in, + new GenericDatumReader<GenericRecord>()); + fileReader.sync(pos); + + schema = fileReader.getSchema(); + datumWriter = new GenericDatumWriter(schema); + out = new ByteArrayOutputStream(); + encoder = EncoderFactory.get().binaryEncoder(out, encoder); + + schemaHash = SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema); + schemaHashString = Hex.encodeHexString(schemaHash); + } + + @Override + public Event readEvent() throws IOException { + if (fileReader.hasNext()) { + record = fileReader.next(record); + out.reset(); + datumWriter.write(record, encoder); + encoder.flush(); + // annotate header with 64-bit schema CRC hash in hex + Event event = EventBuilder.withBody(out.toByteArray()); + if (schemaType == AvroSchemaType.HASH) { + event.getHeaders().put(AVRO_SCHEMA_HEADER_HASH, schemaHashString); + } else { + event.getHeaders().put(AVRO_SCHEMA_HEADER_LITERAL, schema.toString()); + } + return event; + } + return null; + } + + @Override + public List<Event> readEvents(int numEvents) throws IOException { + List<Event> events = Lists.newArrayList(); + for (int i = 0; i < numEvents && fileReader.hasNext(); i++) { + Event event = readEvent(); + if (event != null) { + events.add(event); + } + } + return events; + } + + @Override + public void mark() throws IOException { + long pos = fileReader.previousSync() - DataFileConstants.SYNC_SIZE; + if (pos < 0) pos = 0; + ((RemoteMarkable) ris).markPosition(pos); + } + + @Override + public void reset() throws IOException { + long pos = ((RemoteMarkable) ris).getMarkPosition(); + fileReader.sync(pos); + } + + @Override + public void close() throws IOException { + ris.close(); + } + + public static class Builder implements EventDeserializer.Builder { + + @Override + public EventDeserializer build(Context context, ResettableInputStream in) { + if (!(in instanceof RemoteMarkable)) { + throw new IllegalArgumentException("Cannot use this deserializer " + + "without a RemoteMarkable input stream"); + } + AvroEventDeserializer deserializer + = new AvroEventDeserializer(context, in); + try { + deserializer.initialize(); + } catch (Exception e) { + throw new FlumeException("Cannot instantiate deserializer", e); + } + return deserializer; + } + } + + private static class SeekableResettableInputBridge implements SeekableInput { + + ResettableInputStream ris; + public SeekableResettableInputBridge(ResettableInputStream ris) { + this.ris = ris; + } + + @Override + public void seek(long p) throws IOException { + ris.seek(p); + } + + @Override + public long tell() throws IOException { + return ris.tell(); + } + + @Override + public long length() throws IOException { + if (ris instanceof LengthMeasurable) { + return ((LengthMeasurable) ris).length(); + } else { + // FIXME: Avro doesn't seem to complain about this, + // but probably not a great idea... + return Long.MAX_VALUE; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return ris.read(b, off, len); + } + + @Override + public void close() throws IOException { + ris.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java index 02abc80..ce18130 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventDeserializerType.java @@ -25,6 +25,7 @@ import org.apache.flume.annotations.InterfaceStability; @InterfaceStability.Unstable public enum EventDeserializerType { LINE(LineDeserializer.Builder.class), + AVRO(AvroEventDeserializer.Builder.class), OTHER(null); private final Class<? extends EventDeserializer.Builder> builderClass; http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java new file mode 100644 index 0000000..bc6647f --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/LengthMeasurable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +public interface LengthMeasurable { + + /** returns the total length of the stream or file */ + long length() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java new file mode 100644 index 0000000..774152e --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/RemoteMarkable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +/** Allows for calling mark() without a seek() */ +public interface RemoteMarkable { + + /** + * Indicate that the specified position should be returned to in the case of + * {@link Resettable#reset()} being called. + * @throws java.io.IOException + */ + void markPosition(long position) throws IOException; + + /** + * Return the saved mark position without moving the mark pointer. + * @throws IOException + */ + long getMarkPosition() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java index 49521ab..09f490f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java @@ -21,6 +21,8 @@ package org.apache.flume.serialization; import com.google.common.base.Charsets; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -44,7 +46,10 @@ import java.nio.charset.CoderResult; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ResettableFileInputStream extends ResettableInputStream { +public class ResettableFileInputStream extends ResettableInputStream + implements RemoteMarkable, LengthMeasurable { + + Logger logger = LoggerFactory.getLogger(ResettableFileInputStream.class); public static final int DEFAULT_BUF_SIZE = 16384; @@ -126,6 +131,8 @@ public class ResettableFileInputStream extends ResettableInputStream { @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + logger.trace("read(buf, {}, {})", off, len); + if (position >= fileSize) { return -1; } @@ -194,17 +201,35 @@ public class ResettableFileInputStream extends ResettableInputStream { } @Override + public void markPosition(long position) throws IOException { + tracker.storePosition(position); + } + + @Override + public long getMarkPosition() throws IOException { + return tracker.getPosition(); + } + + @Override public void reset() throws IOException { seek(tracker.getPosition()); } @Override + public long length() throws IOException { + return file.length(); + } + + @Override public long tell() throws IOException { + logger.trace("Tell position: {}", syncPosition); + return syncPosition; } @Override public synchronized void seek(long newPos) throws IOException { + logger.trace("Seek to position: {}", newPos); // check to see if we can seek within our existing buffer long relativeChange = newPos - position; http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java new file mode 100644 index 0000000..6f9ddc2 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.commons.codec.binary.Hex; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.Collections; + +public class TestAvroEventDeserializer { + + private static final Logger logger = + LoggerFactory.getLogger(TestAvroEventDeserializer.class); + + private static final Schema schema; + static { + schema = Schema.createRecord("MyRecord", "", "org.apache.flume", false); + Schema.Field field = new Schema.Field("foo", + Schema.create(Schema.Type.STRING), "", null); + schema.setFields(Collections.singletonList(field)); + } + + @Test + public void resetTest() throws IOException { + File tempFile = newTestFile(true); + + String target = tempFile.getAbsolutePath(); + logger.info("Target: {}", target); + TransientPositionTracker tracker = new TransientPositionTracker(target); + + AvroEventDeserializer.Builder desBuilder = + new AvroEventDeserializer.Builder(); + EventDeserializer deserializer = desBuilder.build(new Context(), + new ResettableFileInputStream(tempFile, tracker)); + + BinaryDecoder decoder = null; + DatumReader<GenericRecord> reader = + new GenericDatumReader<GenericRecord>(schema); + + decoder = DecoderFactory.get().binaryDecoder( + deserializer.readEvent().getBody(), decoder); + assertEquals("bar", reader.read(null, decoder).get("foo").toString()); + + deserializer.reset(); + + decoder = DecoderFactory.get().binaryDecoder( + deserializer.readEvent().getBody(), decoder); + assertEquals("bar", reader.read(null, decoder).get("foo").toString()); + + deserializer.mark(); + + decoder = DecoderFactory.get().binaryDecoder( + deserializer.readEvent().getBody(), decoder); + assertEquals("baz", reader.read(null, decoder).get("foo").toString()); + + deserializer.reset(); + + decoder = DecoderFactory.get().binaryDecoder( + deserializer.readEvent().getBody(), decoder); + assertEquals("baz", reader.read(null, decoder).get("foo").toString()); + + assertNull(deserializer.readEvent()); + } + + @Test + public void testSchemaHash() throws IOException, NoSuchAlgorithmException { + File tempFile = newTestFile(true); + + String target = tempFile.getAbsolutePath(); + logger.info("Target: {}", target); + TransientPositionTracker tracker = new TransientPositionTracker(target); + + Context context = new Context(); + context.put(AvroEventDeserializer.CONFIG_SCHEMA_TYPE_KEY, + AvroEventDeserializer.AvroSchemaType.HASH.toString()); + + ResettableInputStream in = + new ResettableFileInputStream(tempFile, tracker); + EventDeserializer des = + new AvroEventDeserializer.Builder().build(context, in); + + Event event = des.readEvent(); + String eventSchemaHash = + event.getHeaders().get(AvroEventDeserializer.AVRO_SCHEMA_HEADER_HASH); + String expectedSchemaHash = Hex.encodeHexString( + SchemaNormalization.parsingFingerprint("CRC-64-AVRO", schema)); + + Assert.assertEquals(expectedSchemaHash, eventSchemaHash); + } + + @Test + public void testSchemaLiteral() throws IOException { + File tempFile = newTestFile(true); + + String target = tempFile.getAbsolutePath(); + logger.info("Target: {}", target); + TransientPositionTracker tracker = new TransientPositionTracker(target); + + Context context = new Context(); + context.put(AvroEventDeserializer.CONFIG_SCHEMA_TYPE_KEY, + AvroEventDeserializer.AvroSchemaType.LITERAL.toString()); + + ResettableInputStream in = + new ResettableFileInputStream(tempFile, tracker); + EventDeserializer des = + new AvroEventDeserializer.Builder().build(context, in); + + Event event = des.readEvent(); + String eventSchema = + event.getHeaders().get(AvroEventDeserializer.AVRO_SCHEMA_HEADER_LITERAL); + + Assert.assertEquals(schema.toString(), eventSchema); + } + + private File newTestFile(boolean deleteOnExit) throws IOException { + File tempFile = File.createTempFile("testDirectFile", "tmp"); + if (deleteOnExit) { + tempFile.deleteOnExit(); + } + + DataFileWriter<GenericRecord> writer = + new DataFileWriter<GenericRecord>( + new GenericDatumWriter<GenericRecord>(schema)); + writer.create(schema, tempFile); + GenericRecordBuilder recordBuilder; + recordBuilder = new GenericRecordBuilder(schema); + recordBuilder.set("foo", "bar"); + GenericRecord record = recordBuilder.build(); + writer.append(record); + writer.sync(); + recordBuilder = new GenericRecordBuilder(schema); + recordBuilder.set("foo", "baz"); + record = recordBuilder.build(); + writer.append(record); + writer.sync(); + writer.flush(); + writer.close(); + + return tempFile; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java new file mode 100644 index 0000000..ba50d54 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TransientPositionTracker.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +public class TransientPositionTracker implements PositionTracker { + + private final String target; + private long position = 0; + + public TransientPositionTracker(String target) { + this.target = target; + } + + @Override + public void storePosition(long position) throws IOException { + this.position = position; + } + + @Override + public long getPosition() { + return position; + } + + @Override + public String getTarget() { + return target; + } + + @Override + public void close() throws IOException { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 74863d4..06f8efa 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -980,6 +980,33 @@ deserializer.maxLineLength 2048 Maximum number of characters to deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel. ============================== ============== ========================================================== +AVRO +^^^^ + +This deserializer is able to read an Avro container file, and it generates +one event per Avro record in the file. +Each event is annotated with a header that indicates the schema used. +The body of the event is the binary Avro record data, not +including the schema or the rest of the container file elements. + +Note that if the spool directory source must retry putting one of these events +onto a channel (for example, because the channel is full), then it will reset +and retry from the most recent Avro container file sync point. To reduce +potential event duplication in such a failure scenario, write sync markers more +frequently in your Avro input files. + +============================== ============== ====================================================================== +Property Name Default Description +============================== ============== ====================================================================== +deserializer.schemaType HASH How the schema is represented. By default, or when the value ``HASH`` + is specified, the Avro schema is hashed and + the hash is stored in every event in the event header + "flume.avro.schema.hash". If ``LITERAL`` is specified, the JSON-encoded + schema itself is stored in every event in the event header + "flume.avro.schema.literal". Using ``LITERAL`` mode is relatively + inefficient compared to ``HASH`` mode. +============================== ============== ====================================================================== + NetCat Source ~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/ad68350f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 251f345..350d6a4 100644 --- a/pom.xml +++ b/pom.xml @@ -780,6 +780,12 @@ limitations under the License. </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.8</version> + </dependency> + + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.1</version>
