http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/core/src/resources/sales.schema ---------------------------------------------------------------------- diff --git a/java/bench/core/src/resources/sales.schema b/java/bench/core/src/resources/sales.schema new file mode 100644 index 0000000..df96409 --- /dev/null +++ b/java/bench/core/src/resources/sales.schema @@ -0,0 +1,56 @@ +struct< + sales_id:bigint, + customer_id:bigint, + col3:bigint, + item_category:bigint, + item_count:bigint, + change_ts:timestamp, + store_location:string, + associate_id:string, + col9:bigint, + rebate_id:string, + create_ts:timestamp, + col13:bigint, + size:string, + col14:bigint, + fulfilled:boolean, + global_id:string, + col17:string, + col18:string, + col19:bigint, + has_rebate:boolean, + col21:array< + struct< + sub1:bigint, + sub2:string, + sub3:string, + sub4:bigint, + sub5:bigint, + sub6:string>>, + vendor_id:string, + country:string, + backend_version:string, + col41:bigint, + col42:bigint, + col43:bigint, + col44:bigint, + col45:bigint, + col46:bigint, + col47:bigint, + col48:bigint, + col49:string, + col50:string, + col51:bigint, + col52:bigint, + col53:bigint, + col54:bigint, + col55:string, + col56:timestamp, + col57:timestamp, + md5:bigint, + col59:bigint, + col69:timestamp, + col61:string, + col62:string, + col63:timestamp, + col64:bigint>
http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/core/src/resources/taxi.schema ---------------------------------------------------------------------- diff --git a/java/bench/core/src/resources/taxi.schema b/java/bench/core/src/resources/taxi.schema new file mode 100644 index 0000000..5eb7c0f --- /dev/null +++ b/java/bench/core/src/resources/taxi.schema @@ -0,0 +1,21 @@ +struct< + vendor_id:int, + pickup_time: timestamp, + dropoff_time: timestamp, + passenger_count: int, + trip_distance: double, + pickup_longitude: double, + pickup_latitude: double, + ratecode_id: int, + store_and_fwd_flag: string, + dropoff_longitude: double, + dropoff_latitude: double, + payment_type: int, + fare_amount: decimal(8,2), + extra: decimal(8,2), + mta_tax: decimal(8,2), + tip_amount: decimal(8,2), + tolls_amount: decimal(8,2), + improvement_surcharge : decimal(8,2), + total_amount: decimal(8,2) +> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/hive/pom.xml b/java/bench/hive/pom.xml new file mode 100644 index 0000000..8418219 --- /dev/null +++ b/java/bench/hive/pom.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks</artifactId> + <version>1.6.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks-hive</artifactId> + <version>1.6.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>ORC Benchmarks Hive</name> + <description> + File format benchmarks for Hive. + </description> + + <dependencies> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <classifier>core</classifier> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.orc.bench.core.Driver</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>cmake</id> + <build> + <directory>${build.dir}/bench/hive</directory> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/assembly/uber.xml ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/assembly/uber.xml b/java/bench/hive/src/assembly/uber.xml new file mode 100644 index 0000000..014eab9 --- /dev/null +++ b/java/bench/hive/src/assembly/uber.xml @@ -0,0 +1,33 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <id>uber</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + </dependencySet> + </dependencySets> + <containerDescriptorHandlers> + <containerDescriptorHandler> + <handlerName>metaInf-services</handlerName> + </containerDescriptorHandler> + </containerDescriptorHandlers> +</assembly> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/findbugs/exclude.xml ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/findbugs/exclude.xml b/java/bench/hive/src/findbugs/exclude.xml new file mode 100644 index 0000000..dde1471 --- /dev/null +++ b/java/bench/hive/src/findbugs/exclude.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + <Match> + <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2"/> + </Match> + <Match> + <Class name="~org\.openjdk\.jmh\.infra\.generated.*"/> + </Match> + <Match> + <Class name="~org\.apache\.orc\.bench\.generated.*"/> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java b/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java new file mode 100644 index 0000000..f75c7f0 --- /dev/null +++ b/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; + +import java.util.List; + +/** + * HiveUtilities that need the non-public methods from Hive. + */ +public class OrcBenchmarkUtilities { + + public static StructObjectInspector createObjectInspector(TypeDescription schema) { + List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema); + return (StructObjectInspector) OrcStruct.createObjectInspector(0, types); + } + + public static Writable nextObject(VectorizedRowBatch batch, + TypeDescription schema, + int rowId, + Writable obj) { + OrcStruct result = (OrcStruct) obj; + if (result == null) { + result = new OrcStruct(batch.cols.length); + } + List<TypeDescription> childrenTypes = schema.getChildren(); + for(int c=0; c < batch.cols.length; ++c) { + result.setFieldValue(c, RecordReaderImpl.nextValue(batch.cols[c], rowId, + childrenTypes.get(c), result.getFieldValue(c))); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java new file mode 100644 index 0000000..146a6e7 --- /dev/null +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.hive; + +import com.google.auto.service.AutoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Thread) +@AutoService(OrcBenchmark.class) +public class ColumnProjectionBenchmark implements OrcBenchmark { + + private static final Path root = Utilities.getBenchmarkRoot(); + + @Param({ "github", "sales", "taxi"}) + public String dataset; + + @Param({"none", "snappy", "gz"}) + public String compression; + + @Override + public String getName() { + return "read-some"; + } + + @Override + public String getDescription() { + return "Benchmark column projection"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); + } + + @Benchmark + public void orc(ReadCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + boolean[] include = new boolean[schema.getMaximumId() + 1]; + // select first two columns + List<TypeDescription> children = schema.getChildren(); + for(int c= children.get(0).getId(); c <= children.get(1).getMaximumId(); ++c) { + include[c] = true; + } + RecordReader rows = reader.rows(new Reader.Options() + .include(include)); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.addRecords(batch.size); + } + rows.close(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } + + @Benchmark + public void parquet(ReadCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + if ("taxi".equals(dataset)) { + conf.set("columns", "vendor_id,pickup_time"); + conf.set("columns.types", "int,timestamp"); + } else if ("sales".equals(dataset)) { + conf.set("columns", "sales_id,customer_id"); + conf.set("columns.types", "bigint,bigint"); + } else if ("github".equals(dataset)) { + conf.set("columns", "actor,created_at"); + conf.set("columns.types", "struct<avatar_url:string,gravatar_id:string," + + "id:int,login:string,url:string>,timestamp"); + } else { + throw new IllegalArgumentException("Unknown data set " + dataset); + } + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.addRecords(1); + } + recordReader.close(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java new file mode 100644 index 0000000..0345035 --- /dev/null +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.hive; + +import com.google.auto.service.AutoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.bench.core.NullFileSystem; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.Utilities; +import org.apache.orc.bench.core.convert.BatchReader; +import org.apache.orc.bench.core.convert.GenerateVariants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@AutoService(OrcBenchmark.class) +public class DecimalBench implements OrcBenchmark { + + private static final Path root = Utilities.getBenchmarkRoot(); + + @Override + public String getName() { + return "decimal"; + } + + @Override + public String getDescription() { + return "Benchmark new decimal64 read and write"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); + } + + /** + * Abstract out whether we are writing short or long decimals + */ + interface Loader { + /** + * Load the data from the values array into the ColumnVector. + * @param vector the output + * @param values the intput + * @param offset the first input value + * @param length the number of values to copy + */ + void loadData(ColumnVector vector, long[] values, int offset, int length); + } + + static class Decimal64Loader implements Loader { + final int scale; + final int precision; + + Decimal64Loader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public void loadData(ColumnVector vector, long[] values, int offset, int length) { + Decimal64ColumnVector v = (Decimal64ColumnVector) vector; + v.ensureSize(length, false); + v.noNulls = true; + for(int p=0; p < length; ++p) { + v.vector[p] = values[p + offset]; + } + v.precision = (short) precision; + v.scale = (short) scale; + } + } + + static class DecimalLoader implements Loader { + final int scale; + final int precision; + + DecimalLoader(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public void loadData(ColumnVector vector, long[] values, int offset, int length) { + DecimalColumnVector v = (DecimalColumnVector) vector; + v.noNulls = true; + for(int p=0; p < length; ++p) { + v.vector[p].setFromLongAndScale(values[offset + p], scale); + } + v.precision = (short) precision; + v.scale = (short) scale; + } + } + + @State(Scope.Thread) + public static class OutputState { + + // try both DecimalColumnVector and Decimal64ColumnVector + @Param({"ORIGINAL", "USE_DECIMAL64"}) + public TypeDescription.RowBatchVersion version; + + long[] total_amount = new long[1024 * 1024]; + Configuration conf = new Configuration(); + FileSystem fs = new NullFileSystem(); + TypeDescription schema; + VectorizedRowBatch batch; + Loader loader; + int precision; + + @Setup + public void setup() throws IOException { + if (version == TypeDescription.RowBatchVersion.ORIGINAL) { + precision = 19; + loader = new DecimalLoader(precision, 2); + } else { + precision = 8; + loader = new Decimal64Loader(precision, 2); + } + schema = TypeDescription.createDecimal() + .withScale(2) + .withPrecision(precision); + readCsvData(total_amount, root, "total_amount", conf); + batch = schema.createRowBatchV2(); + } + } + + @Benchmark + public void write(OutputState state) throws Exception { + Writer writer = OrcFile.createWriter(new Path("null"), + OrcFile.writerOptions(state.conf) + .fileSystem(state.fs) + .setSchema(state.schema) + .compress(CompressionKind.NONE)); + int r = 0; + int batchSize = state.batch.getMaxSize(); + while (r < state.total_amount.length) { + state.batch.size = batchSize; + state.loader.loadData(state.batch.cols[0], state.total_amount, r, batchSize); + writer.addRowBatch(state.batch); + r += batchSize; + } + writer.close(); + } + + static void readCsvData(long[] data, + Path root, + String column, + Configuration conf) throws IOException { + TypeDescription schema = Utilities.loadSchema("taxi.schema"); + int row = 0; + int batchPosn = 0; + BatchReader reader = + new GenerateVariants.RecursiveReader(new Path(root, "sources/taxi"), "csv", + schema, conf, org.apache.orc.bench.core.CompressionKind.ZLIB); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = 0; + TypeDescription columnSchema = schema.findSubtype(column); + DecimalColumnVector cv = (DecimalColumnVector) batch.cols[columnSchema.getId() - 1]; + int scale = columnSchema.getScale(); + while (row < data.length) { + if (batchPosn >= batch.size) { + // Read the next batch and ignore eof. If the file is shorter + // than we need, just reuse the current batch over again. + reader.nextBatch(batch); + batchPosn = 0; + } + data[row++] = cv.vector[batchPosn++].serialize64(scale); + } + } + + @State(Scope.Thread) + public static class InputState { + + // try both DecimalColumnVector and Decimal64ColumnVector + @Param({"ORIGINAL", "USE_DECIMAL64"}) + public TypeDescription.RowBatchVersion version; + + Configuration conf = new Configuration(); + FileSystem fs; + TypeDescription schema; + VectorizedRowBatch batch; + Path path; + boolean[] include; + Reader reader; + OrcFile.ReaderOptions options; + + @Setup + public void setup() throws IOException { + fs = FileSystem.getLocal(conf).getRaw(); + path = new Path(root, "generated/taxi/orc.none"); + schema = Utilities.loadSchema("taxi.schema"); + batch = schema.createRowBatch(version, 1024); + // only include the columns with decimal values + include = new boolean[schema.getMaximumId() + 1]; + for(TypeDescription child: schema.getChildren()) { + if (child.getCategory() == TypeDescription.Category.DECIMAL) { + include[child.getId()] = true; + } + } + reader = OrcFile.createReader(path, + OrcFile.readerOptions(conf).filesystem(fs)); + // just read the decimal columns from the first stripe + reader.options().include(include).range(0, 1000); + } + } + + @Benchmark + public void read(Blackhole blackhole, InputState state) throws Exception { + RecordReader rows = state.reader.rows(); + while (rows.nextBatch(state.batch)) { + blackhole.consume(state.batch); + } + rows.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java new file mode 100644 index 0000000..2bbcf60 --- /dev/null +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.hive; + +import com.google.auto.service.AutoService; +import com.google.gson.JsonStreamParser; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Thread) +@AutoService(OrcBenchmark.class) +public class FullReadBenchmark implements OrcBenchmark { + + private static final Path root = Utilities.getBenchmarkRoot(); + + @Param({"taxi", "sales", "github"}) + public String dataset; + + @Param({"none", "gz", "snappy"}) + public String compression; + + @Override + public String getName() { + return "read-all"; + } + + @Override + public String getDescription() { + return "read all columns and rows"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); + } + + @Benchmark + public void orc(ReadCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + RecordReader rows = reader.rows(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.addRecords(batch.size); + } + rows.close(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } + + @Benchmark + public void avro(ReadCounters counters) throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "avro", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + FsInput file = new FsInput(path, conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + counters.addRecords(1); + } + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } + + @Benchmark + public void parquet(ReadCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.addRecords(1); + } + recordReader.close(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } + + @Benchmark + public void json(ReadCounters counters) throws Exception { + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + Path path = Utilities.getVariant(root, dataset, "json", compression); + CompressionKind compress = CompressionKind.fromExtension(compression); + InputStream input = compress.read(fs.open(path)); + JsonStreamParser parser = + new JsonStreamParser(new InputStreamReader(input, + StandardCharsets.UTF_8)); + while (parser.hasNext()) { + parser.next(); + counters.addRecords(1); + } + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/pom.xml b/java/bench/pom.xml index 2cebf1a..aed26b6 100644 --- a/java/bench/pom.xml +++ b/java/bench/pom.xml @@ -26,7 +26,7 @@ <groupId>org.apache.orc</groupId> <artifactId>orc-benchmarks</artifactId> <version>1.6.0-SNAPSHOT</version> - <packaging>jar</packaging> + <packaging>pom</packaging> <name>ORC Benchmarks</name> <description> Benchmarks for comparing ORC, Parquet, JSON, and Avro performance. @@ -39,178 +39,507 @@ <avro.version>1.8.2</avro.version> <hadoop.version>2.7.3</hadoop.version> <hive.version>2.3.3</hive.version> + <iceberg.version>0.1.3</iceberg.version> <jmh.version>1.20</jmh.version> - <orc.version>1.6.0-SNAPSHOT</orc.version> - <parquet.version>1.9.0</parquet.version> - <storage-api.version>2.5.0</storage-api.version> + <orc.version>1.5.2</orc.version> + <parquet.version>1.8.3</parquet.version> + <slf4j.version>1.7.25</slf4j.version> + <spark.version>2.3.1</spark.version> + <storage-api.version>2.6.1</storage-api.version> <zookeeper.version>3.4.6</zookeeper.version> </properties> - <dependencies> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>2.8.4</version> - </dependency> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.2.4</version> - </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>1.3.1</version> - </dependency> - <dependency> - <groupId>io.airlift</groupId> - <artifactId>aircompressor</artifactId> - <version>0.10</version> - <exclusions> - <exclusion> - <groupId>io.airlift</groupId> - <artifactId>slice</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>${avro.version}</version> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <classifier>hadoop2</classifier> - <version>${avro.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-csv</artifactId> - <version>1.4</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <classifier>core</classifier> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-storage-api</artifactId> - <version>${storage-api.version}</version> - </dependency> - <dependency> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - <version>${orc.version}</version> - </dependency> - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-hadoop-bundle</artifactId> - <version>${parquet.version}</version> - </dependency> - <dependency> - <groupId>org.jodd</groupId> - <artifactId>jodd-core</artifactId> - <version>3.5.2</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.openjdk.jmh</groupId> - <artifactId>jmh-core</artifactId> - <version>${jmh.version}</version> - </dependency> - <dependency> - <groupId>org.openjdk.jmh</groupId> - <artifactId>jmh-generator-annprocess</artifactId> - <version>${jmh.version}</version> - </dependency> - </dependencies> + <modules> + <module>core</module> + <module>hive</module> + <module>spark</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_2.11</artifactId> + <version>3.2.0</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.8.4</version> + </dependency> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <version>1.0-rc4</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.2.4</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.3.1</version> + </dependency> + <dependency> + <groupId>io.airlift</groupId> + <artifactId>aircompressor</artifactId> + <version>0.10</version> + <exclusions> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>slice</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.netflix.iceberg</groupId> + <artifactId>iceberg-api</artifactId> + <version>${iceberg.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.netflix.iceberg</groupId> + <artifactId>iceberg-core</artifactId> + <version>${iceberg.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.netflix.iceberg</groupId> + <artifactId>iceberg-spark</artifactId> + <version>${iceberg.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.17.Final</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + <version>3.9.9.Final</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>hadoop2</classifier> + <version>${avro.version}</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils-core</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <scope>runtime</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + <scope>runtime</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <classifier>core</classifier> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.calcite.avatica</groupId> + <artifactId>avatica</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>stax</groupId> + <artifactId>stax-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-web</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.aggregate</groupId> + <artifactId>jetty-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service-rpc</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>${storage-api.version}</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks-core</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + <version>${orc.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.glassfish.hk2.external</groupId> + <artifactId>aopalliance-repackaged</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>3.0.8</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>3.0.8</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.jodd</groupId> + <artifactId>jodd-core</artifactId> + <version>3.5.2</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>${jmh.version}</version> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>${jmh.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.11.8</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>runtime</scope> + </dependency> + </dependencies> + </dependencyManagement> <build> <sourceDirectory>${basedir}/src/java</sourceDirectory> <testSourceDirectory>${basedir}/src/test</testSourceDirectory> - <testResources> - <testResource> - <directory>${basedir}/src/test/resources</directory> - </testResource> - </testResources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <version>3.0.0-M1</version> - <executions> - <execution> - <id>enforce-maven</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <requireMavenVersion> - <version>2.2.1</version> - </requireMavenVersion> - </rules> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> </plugin> <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>3.1.0</version> - <configuration> - <archive> - <manifest> - <mainClass>org.apache.orc.bench.Driver</mainClass> - </manifest> - </archive> - <descriptors> - <descriptor>src/assembly/uber.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>make-assembly</id> <!-- this is used for inheritance merges --> - <phase>package</phase> <!-- bind to the packaging phase --> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> </plugin> </plugins> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>3.0.0-M1</version> + <executions> + <execution> + <id>enforce-maven</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireMavenVersion> + <version>2.2.1</version> + </requireMavenVersion> + </rules> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + <compilerArgs> + <arg>-Xlint:unchecked</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/assembly/uber.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> </build> <profiles> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/spark/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/spark/pom.xml b/java/bench/spark/pom.xml new file mode 100644 index 0000000..90e29a4 --- /dev/null +++ b/java/bench/spark/pom.xml @@ -0,0 +1,203 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks</artifactId> + <version>1.6.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks-spark</artifactId> + <version>1.6.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>ORC Benchmarks Spark</name> + <description> + Benchmarks for comparing ORC, Parquet, JSON, and Avro performance under + Spark. + </description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation> + </properties> + + <dependencies> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_2.11</artifactId> + </dependency> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_2.11</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + </dependency> + <dependency> + <groupId>org.jodd</groupId> + <artifactId>jodd-core</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.1.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + <filters> + <filter> + <artifact>org.codehaus.janino:janino</artifact> + <excludes> + <exclude>META-INF/DUMMY.SF</exclude> + <exclude>META-INF/DUMMY.DSA</exclude> + </excludes> + </filter> + <filter> + <artifact>org.codehaus.janino:commons-compiler</artifact> + <excludes> + <exclude>META-INF/DUMMY.SF</exclude> + <exclude>META-INF/DUMMY.DSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.orc.storage</pattern> + <shadedPattern>org.apache.hadoop.hive</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <manifestEntries> + <Main-Class>org.apache.orc.bench.core.Driver</Main-Class> + </manifestEntries> + </transformer> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>cmake</id> + <build> + <directory>${build.dir}/bench/spark</directory> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java new file mode 100644 index 0000000..87d3277 --- /dev/null +++ b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.spark; + +import com.google.auto.service.AutoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.FileFormat; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat; +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat; +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; +import org.apache.spark.sql.sources.And$; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThanOrEqual$; +import org.apache.spark.sql.sources.LessThan$; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import scala.Function1; + +import java.io.IOException; + +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.JavaConverters; +import scala.collection.immutable.Map; +import scala.collection.immutable.Map$; +import scala.collection.Seq; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@AutoService(OrcBenchmark.class) +public class SparkBenchmark implements OrcBenchmark { + + private static final Path root = Utilities.getBenchmarkRoot(); + + @Override + public String getName() { + return "spark"; + } + + @Override + public String getDescription() { + return "Run Spark benchmarks"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, this.getClass())).run(); + } + + @State(Scope.Thread) + public static class InputSource { + SparkSession session; + TrackingLocalFileSystem fs; + Configuration conf; + Path path; + StructType schema; + StructType empty = new StructType(); + FileFormat formatObject; + + @Param({"taxi", "sales", "github"}) + String dataset; + + @Param({"none", "gz", "snappy"}) + String compression; + + @Param({"orc", "parquet", "json"}) + String format; + + @Setup(Level.Trial) + public void setup() { + session = SparkSession.builder().appName("benchmark") + .config("spark.master", "local[4]") + .config("spark.sql.orc.filterPushdown", true) + .config("spark.sql.orc.impl", "native") + .getOrCreate(); + conf = session.sparkContext().hadoopConfiguration(); + conf.set("avro.mapred.ignore.inputs.without.extension","false"); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + path = new Path("track://", + Utilities.getVariant(root, dataset, format, compression)); + try { + fs = (TrackingLocalFileSystem) path.getFileSystem(conf); + } catch (IOException e) { + throw new IllegalArgumentException("Can't get filesystem", e); + } + try { + TypeDescription orcSchema = Utilities.loadSchema(dataset + ".schema"); + schema = (StructType) SparkSchema.convertToSparkType(orcSchema); + } catch (IOException e) { + throw new IllegalArgumentException("Can't read schema " + dataset, e); + } + switch (format) { + case "avro": + formatObject = new com.databricks.spark.avro.DefaultSource(); + break; + case "orc": + formatObject = new OrcFileFormat(); + break; + case "parquet": + formatObject = new ParquetFileFormat(); + break; + case "json": + formatObject = new JsonFileFormat(); + break; + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + } + + static void processReader(Iterator<InternalRow> reader, + FileSystem.Statistics statistics, + ReadCounters counters, + Blackhole blackhole) { + while (reader.hasNext()) { + Object row = reader.next(); + if (row instanceof ColumnarBatch) { + counters.addRecords(((ColumnarBatch) row).numRows()); + } else { + counters.addRecords(1); + } + blackhole.consume(row); + } + counters.addInvocation(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + } + + @Benchmark + public void fullRead(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List<Filter> filters = new ArrayList<>(); + List<Tuple2<String,String>> options = new ArrayList<>(); + switch (source.format) { + case "json": + options.add(new Tuple2<>("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSS")); + break; + default: + break; + } + Seq<Tuple2<String,String>> optionsScala = JavaConverters + .asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map<String,String> scalaMap = (Map<String, String>)Map$.MODULE$.apply(optionsScala); + Function1<PartitionedFile,Iterator<InternalRow>> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, source.schema, + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } + + @Benchmark + public void partialRead(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List<Filter> filters = new ArrayList<>(); + List<Tuple2<String,String>> options = new ArrayList<>(); + switch (source.format) { + case "json": + case "avro": + throw new IllegalArgumentException(source.format + " can't handle projection"); + default: + break; + } + TypeDescription readSchema = null; + switch (source.dataset) { + case "taxi": + readSchema = TypeDescription.fromString("struct<vendor_id:int," + + "pickup_time:timestamp>"); + break; + case "sales": + readSchema = TypeDescription.fromString("struct<sales_id:bigint," + + "customer_id:bigint>"); + break; + case "github": + readSchema = TypeDescription.fromString("struct<actor:struct<" + + "avatar_url:string,gravatar_id:string,id:int,login:string,url:string>," + + "created_at:timestamp>"); + break; + } + Seq<Tuple2<String,String>> optionsScala = JavaConverters.asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map<String,String> scalaMap = (Map<String, String>)Map$.MODULE$.apply(optionsScala); + Function1<PartitionedFile,Iterator<InternalRow>> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, + (StructType) SparkSchema.convertToSparkType(readSchema), + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } + + @Benchmark + public void pushDown(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List<Filter> filters = new ArrayList<>(); + switch (source.dataset) { + case "taxi": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("pickup_time", + Timestamp.valueOf("2015-11-01 00:00:00.0")), + LessThan$.MODULE$.apply("pickup_time", + Timestamp.valueOf("2015-11-01 00:01:00.0")))); + break; + case "sales": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("sales_id", 1000000000L), + LessThan$.MODULE$.apply("sales_id", 1000001000L))); + break; + case "github": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("created_at", + Timestamp.valueOf("2015-11-01 00:00:00.0")), + LessThan$.MODULE$.apply("created_at", + Timestamp.valueOf("2015-11-01 00:01:00.0")))); + break; + } + List<Tuple2<String,String>> options = new ArrayList<>(); + switch (source.format) { + case "json": + case "avro": + throw new IllegalArgumentException(source.format + " can't handle pushdown"); + default: + break; + } + Seq<Tuple2<String,String>> optionsScala = JavaConverters.asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map<String,String> scalaMap = (Map<String, String>)Map$.MODULE$.apply(optionsScala); + Function1<PartitionedFile,Iterator<InternalRow>> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, source.schema, + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java ---------------------------------------------------------------------- diff --git a/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java new file mode 100644 index 0000000..6d4d2a8 --- /dev/null +++ b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.spark; + +import org.apache.orc.TypeDescription; +import org.apache.spark.sql.types.ArrayType$; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType$; +import org.apache.spark.sql.types.ByteType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType$; +import org.apache.spark.sql.types.DecimalType$; +import org.apache.spark.sql.types.DoubleType$; +import org.apache.spark.sql.types.FloatType$; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.MapType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType$; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType$; + +import java.util.ArrayList; +import java.util.List; + +public class SparkSchema { + + public static DataType convertToSparkType(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return BooleanType$.MODULE$; + case BYTE: + return ByteType$.MODULE$; + case SHORT: + return ShortType$.MODULE$; + case INT: + return IntegerType$.MODULE$; + case LONG: + return LongType$.MODULE$; + case FLOAT: + return FloatType$.MODULE$; + case DOUBLE: + return DoubleType$.MODULE$; + case BINARY: + return BinaryType$.MODULE$; + case STRING: + case CHAR: + case VARCHAR: + return StringType$.MODULE$; + case DATE: + return DateType$.MODULE$; + case TIMESTAMP: + return TimestampType$.MODULE$; + case DECIMAL: + return DecimalType$.MODULE$.apply(schema.getPrecision(), schema.getScale()); + case LIST: + return ArrayType$.MODULE$.apply( + convertToSparkType(schema.getChildren().get(0)), true); + case MAP: + return MapType$.MODULE$.apply( + convertToSparkType(schema.getChildren().get(0)), + convertToSparkType(schema.getChildren().get(1)), true); + case STRUCT: { + int size = schema.getChildren().size(); + List<StructField> sparkFields = new ArrayList<>(size); + for(int c=0; c < size; ++c) { + sparkFields.add(StructField.apply(schema.getFieldNames().get(c), + convertToSparkType(schema.getChildren().get(c)), true, + Metadata.empty())); + } + return StructType$.MODULE$.apply(sparkFields); + } + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/src/assembly/uber.xml ---------------------------------------------------------------------- diff --git a/java/bench/src/assembly/uber.xml b/java/bench/src/assembly/uber.xml deleted file mode 100644 index 014eab9..0000000 --- a/java/bench/src/assembly/uber.xml +++ /dev/null @@ -1,33 +0,0 @@ -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly> - <id>uber</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>true</useProjectArtifact> - <unpack>true</unpack> - <scope>runtime</scope> - </dependencySet> - </dependencySets> - <containerDescriptorHandlers> - <containerDescriptorHandler> - <handlerName>metaInf-services</handlerName> - </containerDescriptorHandler> - </containerDescriptorHandlers> -</assembly> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/src/findbugs/exclude.xml ---------------------------------------------------------------------- diff --git a/java/bench/src/findbugs/exclude.xml b/java/bench/src/findbugs/exclude.xml deleted file mode 100644 index dde1471..0000000 --- a/java/bench/src/findbugs/exclude.xml +++ /dev/null @@ -1,25 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> - <Match> - <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2"/> - </Match> - <Match> - <Class name="~org\.openjdk\.jmh\.infra\.generated.*"/> - </Match> - <Match> - <Class name="~org\.apache\.orc\.bench\.generated.*"/> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java deleted file mode 100644 index 0440495..0000000 --- a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.FileNotFoundException; -import java.io.IOException; - -public class TrackingLocalFileSystem extends RawLocalFileSystem { - - class TrackingFileInputStream extends RawLocalFileSystem.LocalFSFileInputStream { - public TrackingFileInputStream(Path f) throws IOException { - super(f); - } - - public int read() throws IOException { - statistics.incrementReadOps(1); - return super.read(); - } - - public int read(byte[] b, int off, int len) throws IOException { - statistics.incrementReadOps(1); - return super.read(b, off, len); - } - - public int read(long position, byte[] b, int off, int len) throws IOException { - statistics.incrementReadOps(1); - return super.read(position, b, off, len); - } - } - - public FSDataInputStream open(Path f, int bufferSize) throws IOException { - if (!exists(f)) { - throw new FileNotFoundException(f.toString()); - } - return new FSDataInputStream(new BufferedFSInputStream( - new TrackingFileInputStream(f), bufferSize)); - } - - public FileSystem.Statistics getLocalStatistics() { - return statistics; - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/a6211816/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java deleted file mode 100644 index 18c5d06..0000000 --- a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.orc; - -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.Writable; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; - -import java.util.List; - -/** - * Utilities that need the non-public methods from Hive. - */ -public class OrcBenchmarkUtilities { - - public static StructObjectInspector createObjectInspector(TypeDescription schema) { - List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema); - return (StructObjectInspector) OrcStruct.createObjectInspector(0, types); - } - - public static Writable nextObject(VectorizedRowBatch batch, - TypeDescription schema, - int rowId, - Writable obj) { - OrcStruct result = (OrcStruct) obj; - if (result == null) { - result = new OrcStruct(batch.cols.length); - } - List<TypeDescription> childrenTypes = schema.getChildren(); - for(int c=0; c < batch.cols.length; ++c) { - result.setFieldValue(c, RecordReaderImpl.nextValue(batch.cols[c], rowId, - childrenTypes.get(c), result.getFieldValue(c))); - } - return result; - } -}