This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch orc in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ffed46d2a333dd64722565dc98c16b4965b5a96e Author: Jennifer Dai <j...@linkedin.com> AuthorDate: Mon Mar 18 15:03:41 2019 -0700 ORC Reader --- .../java/org/apache/pinot/common/data/Schema.java | 1 + pinot-orc/pom.xml | 58 ++++++++++ .../pinot/orc/data/readers/ORCRecordReader.java | 126 +++++++++++++++++++++ pom.xml | 11 ++ 4 files changed, 196 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java index f06b14c..087c582 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java @@ -138,6 +138,7 @@ public final class Schema { return _metricFieldSpecs; } + /** * Required by JSON deserializer. DO NOT USE. DO NOT REMOVE. * Adding @Deprecated to prevent usage diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml new file mode 100644 index 0000000..10c0a18 --- /dev/null +++ b/pinot-orc/pom.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>pinot</artifactId> + <groupId>org.apache.pinot</groupId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>pinot-orc</artifactId> + <name>Pinot ORC</name> + <url>https://pinot.apache.org/</url> + <properties> + <pinot.root>${basedir}/..</pinot.root> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-core</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java new file mode 100644 index 0000000..68660fd --- /dev/null +++ b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java @@ -0,0 +1,126 @@ +package org.apache.pinot.orc.data.readers; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ORCRecordReader implements RecordReader { + + private Schema _pinotSchema; + private TypeDescription _orcSchema; + Reader _reader; + org.apache.orc.RecordReader _recordReader; + VectorizedRowBatch _reusableVectorizedRowBatch = new VectorizedRowBatch(1); + + private static final Logger LOGGER = LoggerFactory.getLogger(ORCRecordReader.class); + + @Override + public void init(SegmentGeneratorConfig segmentGeneratorConfig) { + Configuration conf = new Configuration(); + LOGGER.info("Creating segment for {}", segmentGeneratorConfig.getInputFilePath()); + try { + _reader = OrcFile.createReader(new Path(segmentGeneratorConfig.getInputFilePath()), OrcFile.readerOptions(conf)); + _orcSchema = _reader.getSchema(); + LOGGER.info("ORC schema is {}", _orcSchema.toJson()); + + _pinotSchema = segmentGeneratorConfig.getSchema(); + if (_pinotSchema == null) { + throw new IllegalArgumentException("ORCRecordReader requires schema"); + } + _recordReader = _reader.rows(_reader.options().schema(_orcSchema)); + } catch (Exception e) { + LOGGER.error("Caught exception initializing record reader at path {}", segmentGeneratorConfig.getInputFilePath()); + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + try { + return _recordReader.getProgress() != 1; + } catch (IOException e) { + LOGGER.error("Could not get next record"); + throw new RuntimeException(e); + } + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + _reusableVectorizedRowBatch = _orcSchema.createRowBatch(1); + _recordReader.nextBatch(_reusableVectorizedRowBatch); + fillGenericRow(reuse, _reusableVectorizedRowBatch); + return reuse; + } + + private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch rowBatch) throws IOException { + // Read the row data + TypeDescription schema = _reader.getSchema(); + // Create a row batch with max size 1 + + if (schema.getCategory().equals(TypeDescription.Category.STRUCT)) { + for (int i = 0; i < schema.getChildren().size(); i++) { + // Get current column in schema + TypeDescription currColumn = schema.getChildren().get(i); + String currColumnName = currColumn.getFieldNames().get(0); + int currColRowIndex = currColumn.getId(); + genericRow.putField(currColumnName, rowBatch.cols[currColRowIndex]); + } + } else { + throw new IllegalArgumentException("Not a valid schema"); + } + } + + @Override + public void rewind() + throws IOException { + _recordReader = _reader.rows(); + } + + @Override + public org.apache.pinot.common.data.Schema getSchema() { + return _pinotSchema; + } + + @Override + public void close() + throws IOException { + _recordReader.close(); + } +} diff --git a/pom.xml b/pom.xml index d36887d..a01c80f 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ <module>pinot-filesystem</module> <module>pinot-hadoop-filesystem</module> <module>pinot-azure-filesystem</module> + <module>pinot-orc</module> </modules> <licenses> @@ -592,6 +593,16 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + <version>1.5.2</version> + </dependency> + <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org