This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ffed46d2a333dd64722565dc98c16b4965b5a96e
Author: Jennifer Dai <j...@linkedin.com>
AuthorDate: Mon Mar 18 15:03:41 2019 -0700

    ORC Reader
---
 .../java/org/apache/pinot/common/data/Schema.java  |   1 +
 pinot-orc/pom.xml                                  |  58 ++++++++++
 .../pinot/orc/data/readers/ORCRecordReader.java    | 126 +++++++++++++++++++++
 pom.xml                                            |  11 ++
 4 files changed, 196 insertions(+)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
index f06b14c..087c582 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
@@ -138,6 +138,7 @@ public final class Schema {
     return _metricFieldSpecs;
   }
 
+
   /**
    * Required by JSON deserializer. DO NOT USE. DO NOT REMOVE.
    * Adding @Deprecated to prevent usage
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
new file mode 100644
index 0000000..10c0a18
--- /dev/null
+++ b/pinot-orc/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>pinot</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>pinot-orc</artifactId>
+  <name>Pinot ORC</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/..</pinot.root>
+  </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
 
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
new file mode 100644
index 0000000..68660fd
--- /dev/null
+++ 
b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
@@ -0,0 +1,126 @@
+package org.apache.pinot.orc.data.readers;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ORCRecordReader implements RecordReader {
+
+  private Schema _pinotSchema;
+  private TypeDescription _orcSchema;
+  Reader _reader;
+  org.apache.orc.RecordReader _recordReader;
+  VectorizedRowBatch _reusableVectorizedRowBatch = new VectorizedRowBatch(1);
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ORCRecordReader.class);
+
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+    Configuration conf = new Configuration();
+    LOGGER.info("Creating segment for {}", 
segmentGeneratorConfig.getInputFilePath());
+    try {
+      _reader = OrcFile.createReader(new 
Path(segmentGeneratorConfig.getInputFilePath()), OrcFile.readerOptions(conf));
+      _orcSchema = _reader.getSchema();
+      LOGGER.info("ORC schema is {}", _orcSchema.toJson());
+
+      _pinotSchema = segmentGeneratorConfig.getSchema();
+      if (_pinotSchema == null) {
+        throw new IllegalArgumentException("ORCRecordReader requires schema");
+      }
+      _recordReader = _reader.rows(_reader.options().schema(_orcSchema));
+    } catch (Exception e) {
+      LOGGER.error("Caught exception initializing record reader at path {}", 
segmentGeneratorConfig.getInputFilePath());
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      return _recordReader.getProgress() != 1;
+    } catch (IOException e) {
+      LOGGER.error("Could not get next record");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _reusableVectorizedRowBatch = _orcSchema.createRowBatch(1);
+    _recordReader.nextBatch(_reusableVectorizedRowBatch);
+    fillGenericRow(reuse, _reusableVectorizedRowBatch);
+    return reuse;
+  }
+
+  private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch 
rowBatch) throws IOException {
+    // Read the row data
+    TypeDescription schema = _reader.getSchema();
+    // Create a row batch with max size 1
+
+    if (schema.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      for (int i = 0; i < schema.getChildren().size(); i++) {
+        // Get current column in schema
+        TypeDescription currColumn = schema.getChildren().get(i);
+        String currColumnName = currColumn.getFieldNames().get(0);
+        int currColRowIndex = currColumn.getId();
+        genericRow.putField(currColumnName, rowBatch.cols[currColRowIndex]);
+      }
+    } else {
+      throw new IllegalArgumentException("Not a valid schema");
+    }
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _recordReader = _reader.rows();
+  }
+
+  @Override
+  public org.apache.pinot.common.data.Schema getSchema() {
+    return _pinotSchema;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _recordReader.close();
+  }
+}
diff --git a/pom.xml b/pom.xml
index d36887d..a01c80f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
     <module>pinot-filesystem</module>
     <module>pinot-hadoop-filesystem</module>
     <module>pinot-azure-filesystem</module>
+    <module>pinot-orc</module>
   </modules>
 
   <licenses>
@@ -592,6 +593,16 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-core</artifactId>
+        <version>1.5.2</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-mapreduce</artifactId>
+        <version>1.5.2</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to