Repository: flume Updated Branches: refs/heads/trunk 72be82d30 -> acc965134
FLUME-2462. Remove use of deprecated methods in DatasetSink (Ryan Blue via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/acc96513 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/acc96513 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/acc96513 Branch: refs/heads/trunk Commit: acc9651346fe5834cdf5cdf0eb417f624aab1d09 Parents: 72be82d Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Sat Sep 13 14:59:37 2014 -0700 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Sat Sep 13 14:59:37 2014 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/kite/DatasetSink.java | 5 +- .../apache/flume/sink/kite/TestDatasetSink.java | 87 ++++++++++---------- 2 files changed, 45 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index 8f3ae51..4cd3027 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -52,9 +52,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetRepositories; import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; +import org.kitesdk.data.spi.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,7 +172,8 @@ public class DatasetSink extends AbstractSink implements Configurable { new PrivilegedExceptionAction<Dataset<GenericRecord>>() { @Override public Dataset<GenericRecord> run() { - return DatasetRepositories.open(repositoryURI).load(datasetName); + return Datasets.load( + new URIBuilder(repositoryURI, datasetName).build()); } }); } http://git-wip-us.apache.org/repos/asf/flume/blob/acc96513/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index b448b50..a277381 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -28,6 +28,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -62,10 +63,9 @@ import org.junit.Test; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.DatasetRepositories; -import org.kitesdk.data.DatasetRepository; import org.kitesdk.data.Datasets; import org.kitesdk.data.PartitionStrategy; +import org.kitesdk.data.View; public class TestDatasetSink { @@ -73,8 +73,6 @@ public class TestDatasetSink { public static final String DATASET_NAME = "test"; public static final String FILE_DATASET_URI = "dataset:file:target/test-repo/" + DATASET_NAME; - public static final DatasetRepository REPO = DatasetRepositories - .open(FILE_REPO_URI); public static final File SCHEMA_FILE = new File("target/record-schema.avsc"); public static final Schema RECORD_SCHEMA = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" + @@ -94,7 +92,7 @@ public class TestDatasetSink { Context config = null; Channel in = null; - List<GenericData.Record> expected = null; + List<GenericRecord> expected = null; private static final String DFS_DIR = "target/test/dfs"; private static final String TEST_BUILD_DATA_KEY = "test.build.data"; private static String oldTestBuildDataProp = null; @@ -118,8 +116,8 @@ public class TestDatasetSink { @Before public void setup() throws EventDeliveryException { - REPO.delete(DATASET_NAME); - REPO.create(DATASET_NAME, DESCRIPTOR); + Datasets.delete(FILE_DATASET_URI); + Datasets.create(FILE_DATASET_URI, DESCRIPTOR); this.config = new Context(); this.in = new MemoryChannel(); @@ -128,17 +126,17 @@ public class TestDatasetSink { config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, FILE_DATASET_URI); GenericRecordBuilder builder = new GenericRecordBuilder(RECORD_SCHEMA); - expected = Lists.newArrayList( + expected = Lists.<GenericRecord>newArrayList( builder.set("id", "1").set("msg", "msg1").build(), builder.set("id", "2").set("msg", "msg2").build(), builder.set("id", "3").set("msg", "msg3").build()); putToChannel(in, Iterables.transform(expected, - new Function<GenericData.Record, Event>() { + new Function<GenericRecord, Event>() { private int i = 0; @Override - public Event apply(@Nullable GenericData.Record rec) { + public Event apply(@Nullable GenericRecord rec) { this.i += 1; boolean useURI = (i % 2) == 0; return event(rec, RECORD_SCHEMA, SCHEMA_FILE, useURI); @@ -148,7 +146,7 @@ public class TestDatasetSink { @After public void teardown() { - REPO.delete(DATASET_NAME); + Datasets.delete(FILE_DATASET_URI); } @Test @@ -166,7 +164,7 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @@ -185,7 +183,7 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @@ -200,18 +198,17 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @Test public void testParquetDataset() throws EventDeliveryException { Datasets.delete(FILE_DATASET_URI); - Dataset<GenericData.Record> created = Datasets.create(FILE_DATASET_URI, + Dataset<GenericRecord> created = Datasets.create(FILE_DATASET_URI, new DatasetDescriptor.Builder(DESCRIPTOR) .format("parquet") - .build(), - GenericData.Record.class); + .build()); DatasetSink sink = sink(in, config); @@ -226,15 +223,16 @@ public class TestDatasetSink { @Test public void testPartitionedData() throws EventDeliveryException { - REPO.create("partitioned", new DatasetDescriptor.Builder(DESCRIPTOR) - .partitionStrategy(new PartitionStrategy.Builder() - .identity("id", 10) // partition by id - .build()) - .build()); - + URI partitionedUri = URI.create("dataset:file:target/test-repo/partitioned"); try { + Datasets.create(partitionedUri, new DatasetDescriptor.Builder(DESCRIPTOR) + .partitionStrategy(new PartitionStrategy.Builder() + .identity("id", 10) // partition by id + .build()) + .build()); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, - "dataset:file:target/test-repo/partitioned"); + partitionedUri.toString()); DatasetSink sink = sink(in, config); // run the sink @@ -244,11 +242,11 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load("partitioned"))); + read(Datasets.load(partitionedUri))); Assert.assertEquals("Should have committed", 0, remaining(in)); } finally { - if (REPO.exists("partitioned")) { - REPO.delete("partitioned"); + if (Datasets.exists(partitionedUri)) { + Datasets.delete(partitionedUri); } } } @@ -260,19 +258,18 @@ public class TestDatasetSink { MiniDFSCluster cluster = new MiniDFSCluster .Builder(new Configuration()) .build(); - DatasetRepository hdfsRepo = null; - try { - FileSystem dfs = cluster.getFileSystem(); - Configuration conf = dfs.getConf(); - String repoURI = "repo:" + conf.get("fs.defaultFS") + "/tmp/repo"; + FileSystem dfs = cluster.getFileSystem(); + Configuration conf = dfs.getConf(); + + URI hdfsUri = URI.create( + "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo" + DATASET_NAME); + try { // create a repository and dataset in HDFS - hdfsRepo = DatasetRepositories.open(repoURI); - hdfsRepo.create(DATASET_NAME, DESCRIPTOR); + Datasets.create(hdfsUri, DESCRIPTOR); // update the config to use the HDFS repository - config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, - "dataset:" + conf.get("fs.defaultFS") + "/tmp/repo/" + DATASET_NAME); + config.put(DatasetSinkConstants.CONFIG_KITE_DATASET_URI, hdfsUri.toString()); DatasetSink sink = sink(in, config); @@ -283,12 +280,12 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(hdfsRepo.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(hdfsUri))); Assert.assertEquals("Should have committed", 0, remaining(in)); } finally { - if (hdfsRepo != null && hdfsRepo.exists(DATASET_NAME)) { - hdfsRepo.delete(DATASET_NAME); + if (Datasets.exists(hdfsUri)) { + Datasets.delete(hdfsUri); } cluster.shutdown(); } @@ -308,13 +305,13 @@ public class TestDatasetSink { sink.process(); // roll and process the third Assert.assertEquals( Sets.newHashSet(expected.subList(0, 2)), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); sink.roll(); // roll at the next process call sink.process(); // roll, the channel is empty Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); sink.stop(); } @@ -326,7 +323,7 @@ public class TestDatasetSink { DatasetSink sink = sink(in, config); - Dataset<GenericData.Record> records = REPO.load(DATASET_NAME); + Dataset<GenericRecord> records = Datasets.load(FILE_DATASET_URI); // run the sink sink.start(); @@ -369,7 +366,7 @@ public class TestDatasetSink { Assert.assertEquals( Sets.newHashSet(expected), - read(REPO.<GenericData.Record>load(DATASET_NAME))); + read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @@ -430,10 +427,10 @@ public class TestDatasetSink { return sink; } - public static <T> HashSet<T> read(Dataset<T> dataset) { + public static <T> HashSet<T> read(View<T> view) { DatasetReader<T> reader = null; try { - reader = dataset.newReader(); + reader = view.newReader(); return Sets.newHashSet(reader.iterator()); } finally { if (reader != null) {