Updated Branches: refs/heads/flume-1.5 4bb265834 -> 0994eb27a
FLUME-2294. Add a sink for Kite Datasets. (Ryan Blue via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0994eb27 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0994eb27 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0994eb27 Branch: refs/heads/flume-1.5 Commit: 0994eb27a91eb7ea8996acf5e8938b3ee79fbc30 Parents: 4bb2658 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Sat Jan 18 21:16:12 2014 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Sat Jan 18 21:17:20 2014 -0800 ---------------------------------------------------------------------- flume-ng-dist/pom.xml | 18 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 33 ++ flume-ng-sinks/flume-dataset-sink/pom.xml | 139 ++++++ .../org/apache/flume/sink/kite/DatasetSink.java | 367 ++++++++++++++ .../flume/sink/kite/DatasetSinkConstants.java | 56 +++ .../apache/flume/sink/kite/TestDatasetSink.java | 475 +++++++++++++++++++ flume-ng-sinks/pom.xml | 29 ++ pom.xml | 19 +- 8 files changed, 1134 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index 2d0ee47..8b814b7 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -63,6 +63,24 @@ </plugins> </build> + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>2</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-dataset-sink</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + <dependencies> <dependency> <groupId>org.apache.flume</groupId> http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index a2790d9..d120a74 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2018,6 +2018,39 @@ Example for agent named a1: a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1 +Kite Dataset Sink (experimental) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. warning:: + This source is experimental and may change between minor versions of Flume. + Use at your own risk. + +Experimental sink that writes events to a `Kite Dataset <http://kitesdk.org/docs/current/kite-data/guide.html>`_. +This sink will deserialize the body of each incoming event and store the +resulting record in a Kite Dataset. It determines target Dataset by opening a +repository URI, ``kite.repo.uri``, and loading a Dataset by name, +``kite.dataset.name``. + +The only supported serialization is avro, and the record schema must be passed +in the event headers, using either ``flume.avro.schema.literal`` with the JSON +schema representation or ``flume.avro.schema.url`` with a URL where the schema +may be found (``hdfs:/...`` URIs are supported). This is compatible with the +Log4jAppender flume client and the spooling directory source's Avro +deserializer using ``deserializer.schemaType = LITERAL``. + +Note: The ``flume.avro.schema.hash`` header is **not supported**. + +===================== ======= =========================================================== +Property Name Default Description +===================== ======= =========================================================== +**channel** -- +**type** -- Must be org.apache.flume.sink.kite.DatasetSink +**kite.repo.uri** -- URI of the repository to open +**kite.dataset.name** -- Name of the Dataset where records will be written +kite.batchSize 100 Number of records to process in each batch +kite.rollInterval 30 Maximum wait time (seconds) before data files are released +===================== ======= =========================================================== + Custom Sink ~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-sinks/flume-dataset-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/pom.xml b/flume-ng-sinks/flume-dataset-sink/pom.xml new file mode 100644 index 0000000..57fd0e4 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/pom.xml @@ -0,0 +1,139 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-ng-sinks</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-dataset-sink</artifactId> + <name>Flume NG Kite Dataset Sink</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + + <repositories> + <repository> + <id>cdh.repo</id> + <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> + <name>Cloudera Repositories</name> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + + <repository> + <id>cdh.snapshots.repo</id> + <url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url> + <name>Cloudera Snapshots Repository</name> + <snapshots> + <enabled>true</enabled> + </snapshots> + <releases> + <enabled>false</enabled> + </releases> + </repository> + </repositories> + + <dependencies> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-configuration</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> + <!-- build will fail if this is not hadoop-common 2.* + because kite uses hflush. + --> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop2.version}</version> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop2.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java new file mode 100644 index 0000000..9a00fb1 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.AbstractSink; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetRepositories; +import org.kitesdk.data.DatasetWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Experimental sink that writes events to a Kite Dataset. This sink will + * deserialize the body of each incoming event and store the resulting record + * in a Kite Dataset. It determines target Dataset by opening a repository URI, + * {@code kite.repo.uri}, and loading a Dataset by name, + * {@code kite.dataset.name}. + */ +public class DatasetSink extends AbstractSink implements Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class); + + static Configuration conf = new Configuration(); + + /** + * Lock used to protect access to the current writer + */ + private final ReentrantLock writerLock = new ReentrantLock(true); + + private String repositoryURI = null; + private String datasetName = null; + private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; + private Dataset<Object> targetDataset = null; + private DatasetWriter<Object> writer = null; + private SinkCounter counter = null; + + // for rolling files at a given interval + private ScheduledExecutorService rollTimer; + private int rollInterval = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL; + + // for working with avro serialized records + private Object datum = null; + private BinaryDecoder decoder = null; + private LoadingCache<Schema, ReflectDatumReader<Object>> readers = + CacheBuilder.newBuilder() + .build(new CacheLoader<Schema, ReflectDatumReader<Object>>() { + @Override + public ReflectDatumReader<Object> load(Schema schema) { + // must use the target dataset's schema for reading to ensure the + // records are able to be stored using it + return new ReflectDatumReader<Object>( + schema, targetDataset.getDescriptor().getSchema()); + } + }); + private static LoadingCache<String, Schema> schemasFromLiteral = CacheBuilder + .newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String literal) { + Preconditions.checkNotNull(literal, + "Schema literal cannot be null without a Schema URL"); + return new Schema.Parser().parse(literal); + } + }); + private static LoadingCache<String, Schema> schemasFromURL = CacheBuilder + .newBuilder() + .build(new CacheLoader<String, Schema>() { + @Override + public Schema load(String url) throws IOException { + Schema.Parser parser = new Schema.Parser(); + InputStream is = null; + try { + FileSystem fs = FileSystem.get(URI.create(url), conf); + if (url.toLowerCase().startsWith("hdfs:/")) { + is = fs.open(new Path(url)); + } else { + is = new URL(url).openStream(); + } + return parser.parse(is); + } finally { + if (is != null) { + is.close(); + } + } + } + }); + + protected List<String> allowedFormats() { + return Lists.newArrayList("avro"); + } + + @Override + public void configure(Context context) { + this.repositoryURI = context.getString( + DatasetSinkConstants.CONFIG_KITE_REPO_URI); + Preconditions.checkNotNull(repositoryURI, "Repository URI is missing"); + this.datasetName = context.getString( + DatasetSinkConstants.CONFIG_KITE_DATASET_NAME); + Preconditions.checkNotNull(datasetName, "Dataset name is missing"); + this.targetDataset = DatasetRepositories.open(repositoryURI) + .load(datasetName); + + String formatName = targetDataset.getDescriptor().getFormat().getName(); + Preconditions.checkArgument(allowedFormats().contains(formatName), + "Unsupported format: " + formatName); + + // other configuration + this.batchSize = context.getLong( + DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, + DatasetSinkConstants.DEFAULT_BATCH_SIZE); + this.rollInterval = context.getInteger( + DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, + DatasetSinkConstants.DEFAULT_ROLL_INTERVAL); + + this.counter = new SinkCounter(getName()); + } + + @Override + public synchronized void start() { + this.writer = openWriter(targetDataset); + if (rollInterval > 0) { + this.rollTimer = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(getName() + "-timed-roll-thread") + .build()); + rollTimer.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + roll(); + } + }, rollInterval, rollInterval, TimeUnit.SECONDS); + } + counter.start(); + // signal that this sink is ready to process + LOG.info("Started DatasetSink " + getName()); + super.start(); + } + + void roll() { + // if the writer is null, nothing to do + if (writer == null) { + return; + } + + // no need to open/close while the lock is held, just replace the reference + DatasetWriter toClose = null; + DatasetWriter newWriter = openWriter(targetDataset); + + writerLock.lock(); + try { + toClose = writer; + this.writer = newWriter; + } finally { + writerLock.unlock(); + } + + LOG.info("Rolled writer for dataset: " + datasetName); + toClose.close(); + } + + @Override + public synchronized void stop() { + counter.stop(); + if (rollTimer != null) { + rollTimer.shutdown(); + try { + while (!rollTimer.isTerminated()) { + rollTimer.awaitTermination( + DatasetSinkConstants.DEFAULT_TERMINATION_INTERVAL, + TimeUnit.MILLISECONDS); + } + } catch (InterruptedException ex) { + LOG.warn("Interrupted while waiting for shutdown: " + rollTimer); + Thread.interrupted(); + } + } + + if (writer != null) { + // any write problems invalidate the writer, which is immediately closed + writer.close(); + this.writer = null; + } + + // signal that this sink has stopped + LOG.info("Stopped dataset sink: " + getName()); + super.stop(); + } + + @Override + public Status process() throws EventDeliveryException { + if (writer == null) { + throw new EventDeliveryException( + "Cannot recover after previous failure"); + } + + Channel channel = getChannel(); + Transaction transaction = null; + try { + long processedEvents = 0; + + // coarse locking to avoid waiting within the loop + writerLock.lock(); + transaction = channel.getTransaction(); + transaction.begin(); + try { + for (; processedEvents < batchSize; processedEvents += 1) { + Event event = channel.take(); + if (event == null) { + // no events available in the channel + break; + } + + this.datum = deserialize(event, datum); + + // writeEncoded would be an optimization in some cases, but HBase + // will not support it and partitioned Datasets need to get partition + // info from the entity Object. We may be able to avoid the + // serialization round-trip otherwise. + writer.write(datum); + } + // TODO: Add option to sync, depends on CDK-203 + writer.flush(); + } finally { + writerLock.unlock(); + } + + // commit after data has been written and flushed + transaction.commit(); + + if (processedEvents == 0) { + counter.incrementBatchEmptyCount(); + return Status.BACKOFF; + } else if (processedEvents < batchSize) { + counter.incrementBatchUnderflowCount(); + } else { + counter.incrementBatchCompleteCount(); + } + + counter.addToEventDrainSuccessCount(processedEvents); + + return Status.READY; + + } catch (Throwable th) { + // catch-all for any unhandled Throwable so that the transaction is + // correctly rolled back. + if (transaction != null) { + try { + transaction.rollback(); + } catch (Exception ex) { + LOG.error("Transaction rollback failed", ex); + throw Throwables.propagate(ex); + } + } + + // remove the writer's reference and close it + DatasetWriter toClose = null; + writerLock.lock(); + try { + toClose = writer; + this.writer = null; + } finally { + writerLock.unlock(); + } + toClose.close(); + + // handle the exception + Throwables.propagateIfInstanceOf(th, Error.class); + Throwables.propagateIfInstanceOf(th, EventDeliveryException.class); + throw new EventDeliveryException(th); + + } finally { + if (transaction != null) { + transaction.close(); + } + } + } + + /** + * Not thread-safe. + * + * @param event + * @param reuse + * @return + */ + private Object deserialize(Event event, Object reuse) + throws EventDeliveryException { + decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder); + // no checked exception is thrown in the CacheLoader + ReflectDatumReader<Object> reader = readers.getUnchecked(schema(event)); + try { + return reader.read(reuse, decoder); + } catch (IOException ex) { + throw new EventDeliveryException("Cannot deserialize event", ex); + } + } + + private static Schema schema(Event event) throws EventDeliveryException { + Map<String, String> headers = event.getHeaders(); + String schemaURL = headers.get( + DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER); + try { + if (headers.get(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER) != null) { + return schemasFromURL.get(schemaURL); + } else { + return schemasFromLiteral.get( + headers.get(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER)); + } + } catch (ExecutionException ex) { + throw new EventDeliveryException("Cannot get schema", ex.getCause()); + } + } + + private static DatasetWriter<Object> openWriter(Dataset<Object> target) { + DatasetWriter<Object> writer = target.newWriter(); + writer.open(); + return writer; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java new file mode 100644 index 0000000..5087352 --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite; + +public class DatasetSinkConstants { + /** + * URI of the Kite DatasetRepository. + */ + public static final String CONFIG_KITE_REPO_URI = "kite.repo.uri"; + + /** + * Name of the Kite Dataset to write into. + */ + public static final String CONFIG_KITE_DATASET_NAME = "kite.dataset.name"; + + /** + * Number of records to process from the incoming channel per call to process. + */ + public static final String CONFIG_KITE_BATCH_SIZE = "kite.batchSize"; + public static long DEFAULT_BATCH_SIZE = 100; + + /** + * Maximum time to wait before finishing files. + */ + public static final String CONFIG_KITE_ROLL_INTERVAL = "kite.rollInterval"; + public static int DEFAULT_ROLL_INTERVAL = 30; // seconds + + /** + * Interval to wait for thread termination + */ + public static final int DEFAULT_TERMINATION_INTERVAL = 10000; // milliseconds + + /** + * Headers with avro schema information is expected. + */ + public static final String AVRO_SCHEMA_LITERAL_HEADER = + "flume.avro.schema.literal"; + public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url"; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java new file mode 100644 index 0000000..5708f0c --- /dev/null +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.kite; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.SimpleEvent; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetDescriptor; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.DatasetRepositories; +import org.kitesdk.data.DatasetRepository; +import org.kitesdk.data.PartitionStrategy; + +public class TestDatasetSink { + + public static final String FILE_REPO_URI = "repo:file:target/test-repo"; + public static final String DATASET_NAME = "test"; + public static final DatasetRepository REPO = DatasetRepositories + .open(FILE_REPO_URI); + public static final File SCHEMA_FILE = new File("target/record-schema.avsc"); + public static final Schema RECORD_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"string\"}," + + "{\"name\":\"msg\",\"type\":[\"string\",\"null\"]," + + "\"default\":\"default\"}]}"); + public static final Schema COMPATIBLE_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"string\"}]}"); + public static final Schema INCOMPATIBLE_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"user\",\"fields\":[" + + "{\"name\":\"username\",\"type\":\"string\"}]}"); + public static final DatasetDescriptor DESCRIPTOR = new DatasetDescriptor + .Builder() + .schema(RECORD_SCHEMA) + .build(); + + Context config = null; + Channel in = null; + List<GenericData.Record> expected = null; + private static final String DFS_DIR = "target/test/dfs"; + private static final String TEST_BUILD_DATA_KEY = "test.build.data"; + private static String oldTestBuildDataProp = null; + + @BeforeClass + public static void saveSchema() throws IOException { + oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY); + System.setProperty(TEST_BUILD_DATA_KEY, DFS_DIR); + FileWriter schema = new FileWriter(SCHEMA_FILE); + schema.append(RECORD_SCHEMA.toString()); + schema.close(); + } + + @AfterClass + public static void tearDownClass() { + FileUtils.deleteQuietly(new File(DFS_DIR)); + if (oldTestBuildDataProp != null) { + System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp); + } + } + + @Before + public void setup() throws EventDeliveryException { + REPO.create(DATASET_NAME, DESCRIPTOR); + + this.config = new Context(); + config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, FILE_REPO_URI); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, DATASET_NAME); + + this.in = new MemoryChannel(); + Configurables.configure(in, config); + + GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); + expected = Lists.newArrayList( + builder.set("id", "1").set("msg", "msg1").build(), + builder.set("id", "2").set("msg", "msg2").build(), + builder.set("id", "3").set("msg", "msg3").build()); + + putToChannel(in, Iterables.transform(expected, + new Function<GenericData.Record, Event>() { + private int i = 0; + + @Override + public Event apply(@Nullable GenericData.Record rec) { + this.i += 1; + boolean useURI = (i % 2) == 0; + return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI); + } + })); + } + + @After + public void teardown() { + REPO.delete(DATASET_NAME); + } + + @Test + public void testFileStore() throws EventDeliveryException { + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testPartitionedData() throws EventDeliveryException { + REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR) + .partitionStrategy(new PartitionStrategy.Builder() + .identity("id", String.class, 10) // partition by id + .build()) + .build()); + + try { + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_NAME, "partitioned"); + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load("partitioned"))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } finally { + if (REPO.exists("partitioned")) { + REPO.delete("partitioned"); + } + } + } + + @Test + public void testMiniClusterStore() + throws EventDeliveryException, IOException { + // setup a minicluster + MiniDFSCluster cluster = new MiniDFSCluster + .Builder(new Configuration()) + .build(); + DatasetRepository hdfsRepo = null; + try { + FileSystem dfs = cluster.getFileSystem(); + Configuration conf = dfs.getConf(); + String repoURI = "repo:" + conf.get("fs.defaultFS") + "/tmp/repo"; + + // create a repository and dataset in HDFS + hdfsRepo = DatasetRepositories.open(repoURI); + hdfsRepo.create(DATASET_NAME, DESCRIPTOR); + + // update the config to use the HDFS repository + config.put(DatasetSinkConstants.CONFIG_KITE_REPO_URI, repoURI); + + DatasetSink sink = sink(in, config); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(hdfsRepo.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + + } finally { + if (hdfsRepo != null && hdfsRepo.exists(DATASET_NAME)) { + hdfsRepo.delete(DATASET_NAME); + } + cluster.shutdown(); + } + } + + @Test + public void testBatchSize() throws EventDeliveryException { + DatasetSink sink = sink(in, config); + + // release one record per process call + config.put("kite.batchSize", "2"); + Configurables.configure(sink, config); + + sink.start(); + sink.process(); // process the first and second + sink.roll(); // roll at the next process call + sink.process(); // roll and process the third + Assert.assertEquals( + Sets.newHashSet(expected.subList(0, 2)), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + sink.roll(); // roll at the next process call + sink.process(); // roll, the channel is empty + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + sink.stop(); + } + + @Test + public void testTimedFileRolling() + throws EventDeliveryException, InterruptedException { + // use a new roll interval + config.put("kite.rollInterval", "1"); // in seconds + + DatasetSink sink = sink(in, config); + + Dataset<GenericData.Record> records = REPO.load(DATASET_NAME); + + // run the sink + sink.start(); + sink.process(); + + Assert.assertEquals(Sets.<GenericData.Record>newHashSet(), read(records)); + Assert.assertEquals("Should have committed", 0, remaining(in)); + + Thread.sleep(1100); // sleep longer than the roll interval + sink.process(); // rolling happens in the process method + + Assert.assertEquals(Sets.newHashSet(expected), read(records)); + + // wait until the end to stop because it would close the files + sink.stop(); + } + + @Test + public void testCompatibleSchemas() throws EventDeliveryException { + DatasetSink sink = sink(in, config); + + // add a compatible record that is missing the msg field + GenericRecordBuilder compatBuilder = new GenericRecordBuilder( + COMPATIBLE_SCHEMA); + GenericData.Record compatibleRecord = compatBuilder.set("id", "0").build(); + + // add the record to the incoming channel + putToChannel(in, event(compatibleRecord, COMPATIBLE_SCHEMA, null, false)); + + // the record will be read using the real schema, so create the expected + // record using it, but without any data + + GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); + GenericData.Record expectedRecord = builder.set("id", "0").build(); + expected.add(expectedRecord); + + // run the sink + sink.start(); + sink.process(); + sink.stop(); + + Assert.assertEquals( + Sets.newHashSet(expected), + read(REPO.<GenericData.Record>load(DATASET_NAME))); + Assert.assertEquals("Should have committed", 0, remaining(in)); + } + + @Test + public void testIncompatibleSchemas() throws EventDeliveryException { + final DatasetSink sink = sink(in, config); + + GenericRecordBuilder builder = new GenericRecordBuilder( + INCOMPATIBLE_SCHEMA); + GenericData.Record rec = builder.set("username", "koala").build(); + putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, null, false)); + + // run the sink + sink.start(); + assertThrows("Should fail", EventDeliveryException.class, + new Callable() { + @Override + public Object call() throws EventDeliveryException { + sink.process(); + return null; + } + }); + sink.stop(); + + Assert.assertEquals("Should have rolled back", + expected.size() + 1, remaining(in)); + } + + @Test + public void testMissingSchema() throws EventDeliveryException { + final DatasetSink sink = sink(in, config); + + Event badEvent = new SimpleEvent(); + badEvent.setHeaders(Maps.<String, String>newHashMap()); + badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA)); + putToChannel(in, badEvent); + + // run the sink + sink.start(); + assertThrows("Should fail", EventDeliveryException.class, + new Callable() { + @Override + public Object call() throws EventDeliveryException { + sink.process(); + return null; + } + }); + sink.stop(); + + Assert.assertEquals("Should have rolled back", + expected.size() + 1, remaining(in)); + } + + public static DatasetSink sink(Channel in, Context config) { + DatasetSink sink = new DatasetSink(); + sink.setChannel(in); + Configurables.configure(sink, config); + return sink; + } + + public static <T> HashSet<T> read(Dataset<T> dataset) { + DatasetReader<T> reader = dataset.newReader(); + try { + reader.open(); + return Sets.newHashSet(reader.iterator()); + } finally { + reader.close(); + } + } + + public static int remaining(Channel ch) throws EventDeliveryException { + Transaction t = ch.getTransaction(); + try { + t.begin(); + int count = 0; + while (ch.take() != null) { + count += 1; + } + t.commit(); + return count; + } catch (Throwable th) { + t.rollback(); + Throwables.propagateIfInstanceOf(th, Error.class); + Throwables.propagateIfInstanceOf(th, EventDeliveryException.class); + throw new EventDeliveryException(th); + } finally { + t.close(); + } + } + + public static void putToChannel(Channel in, Event... records) + throws EventDeliveryException { + putToChannel(in, Arrays.asList(records)); + } + + public static void putToChannel(Channel in, Iterable<Event> records) + throws EventDeliveryException { + Transaction t = in.getTransaction(); + try { + t.begin(); + for (Event record : records) { + in.put(record); + } + t.commit(); + } catch (Throwable th) { + t.rollback(); + Throwables.propagateIfInstanceOf(th, Error.class); + Throwables.propagateIfInstanceOf(th, EventDeliveryException.class); + throw new EventDeliveryException(th); + } finally { + t.close(); + } + } + + public static Event event( + Object datum, Schema schema, File file, boolean useURI) { + Map<String, String> headers = Maps.newHashMap(); + if (useURI) { + headers.put(DatasetSinkConstants.AVRO_SCHEMA_URL_HEADER, + file.getAbsoluteFile().toURI().toString()); + } else { + headers.put(DatasetSinkConstants.AVRO_SCHEMA_LITERAL_HEADER, + schema.toString()); + } + Event e = new SimpleEvent(); + e.setBody(serialize(datum, schema)); + e.setHeaders(headers); + return e; + } + + @SuppressWarnings("unchecked") + public static byte[] serialize(Object datum, Schema schema) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + ReflectDatumWriter writer = new ReflectDatumWriter(schema); + try { + writer.write(datum, encoder); + encoder.flush(); + } catch (IOException ex) { + Throwables.propagate(ex); + } + return out.toByteArray(); + } + + /** + * A convenience method to avoid a large number of @Test(expected=...) tests. + * + * This variant uses a Callable, which is allowed to throw checked Exceptions. + * + * @param message A String message to describe this assertion + * @param expected An Exception class that the Runnable should throw + * @param callable A Callable that is expected to throw the exception + */ + public static void assertThrows( + String message, Class<? extends Exception> expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown (" + message + "), expected: " + + expected.getName()); + } catch (Exception actual) { + Assert.assertEquals(message, expected, actual.getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/flume-ng-sinks/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml index d03576b..6ac2b4d 100644 --- a/flume-ng-sinks/pom.xml +++ b/flume-ng-sinks/pom.xml @@ -47,4 +47,33 @@ limitations under the License. <module>flume-ng-elasticsearch-sink</module> <module>flume-ng-morphline-solr-sink</module> </modules> + + <profiles> + + <profile> + <id>hadoop-1.0</id> + <activation> + <property> + <name>!hadoop.profile</name> + </property> + </activation> + </profile> + + <profile> + <id>hadoop-2</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>2</value> + </property> + </activation> + <!-- add the flume-dataset-sink, which is only compatible with hadoop-2 + --> + <modules> + <module>flume-dataset-sink</module> + </modules> + </profile> + + </profiles> + </project> http://git-wip-us.apache.org/repos/asf/flume/blob/0994eb27/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 453be95..d71239a 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,8 @@ limitations under the License. <avro.version>1.7.3</avro.version> <elasticsearch.version>0.90.1</elasticsearch.version> + + <hadoop2.version>2.1.0-beta</hadoop2.version> </properties> <modules> @@ -106,7 +108,7 @@ limitations under the License. </property> </activation> <properties> - <hadoop.version>2.1.0-beta</hadoop.version> + <hadoop.version>${hadoop2.version}</hadoop.version> <hbase.version>0.94.2</hbase.version> <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> <thrift.version>0.8.0</thrift.version> @@ -143,6 +145,13 @@ limitations under the License. <artifactId>hadoop-auth</artifactId> <version>${hadoop.version}</version> </dependency> + + <!-- only compatible with hadoop-2 --> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-dataset-sink</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> </dependencies> </dependencyManagement> </profile> @@ -538,7 +547,7 @@ limitations under the License. </goals> <configuration> <excludes> - <exclude>.idea/</exclude> + <exclude>**/.idea/</exclude> <exclude>**/*.iml</exclude> <exclude>**/nb-configuration.xml</exclude> <exclude>.git/</exclude> @@ -1087,6 +1096,12 @@ limitations under the License. <version>3.0.3</version> </dependency> + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-data-core</artifactId> + <version>0.10.1</version> + </dependency> + </dependencies> </dependencyManagement>