Repository: incubator-carbondata Updated Branches: refs/heads/hive 4d0d2d600 -> 8e753a85b
add hive integration for carbon add hive integration to assembly alter CarbonInputFormat to implement mapred.InputFormat add a hive serde for carbon add hive integration to assembly fix error in getQueryModel add debug info add debug info add debug info add debug info fix error in CarbonRecordReader use ArrayWritable for CarbonRecordReader fix error in initializing CarbonRecordReader fix error in initializing CarbonRecordReader fix error in initializing CarbonRecordReader fix error in initializing CarbonRecordReader ä¿®æ¹InputFormatçè¿åå¼ æéè¦æ¥çå设置å°carbonéå» fix nullpoint exception add catalyst depedency add catalyst depedency add catalyst depedency fix error in intializing carbon error add a new hive carbon recordreader æ·»å æobjectåºååæArrayWritableç代ç short/intçæ°æ®ç±»åå¨Carbonå½ä¸å®é ä¸æ¯Longç±»å use right inspector use right inspector fix long can't cast int error fix decimal cast error column size is not equal to column type column size is not equal to column type column size is not equal to column type column size is not equal to column type fix ObjInspector error fix ObjInspector error fix ObjInspector error add a new hive input split should not combine path add support for timestamp clean codes remove unused codes support Date and TimeStamp type add basic hive integration alter code style alter code style alter code style alter code style change create table statement alter CarbonSerde test case alter CarbonSerde test case add carbondata-hive to test classpath add carbondata-hive to test classpath use hive compatible schema exclude kryo exclude kryo make a new profile for hive 1.2.1 remove carbon-hive from parent and assembly pom use groupId to apache hive in pom.xml remote hadoop-yarn-api, but HadoopFileExample will throw exception when debugging in IDEA change profile name add quick start guide for basic hive integration module add private for properties add some params for hive to read subdirectories recursively Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/96aeee85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/96aeee85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/96aeee85 Branch: refs/heads/hive Commit: 96aeee85ddd1c5cc8a792d8cd749cfe7ff72d6b8 Parents: 4d0d2d6 Author: cenyuhai <cenyu...@didichuxing.com> Authored: Sun Mar 12 23:17:40 2017 +0800 Committer: chenliang613 <chenliang...@huawei.com> Committed: Thu Apr 6 10:48:48 2017 +0530 ---------------------------------------------------------------------- dev/java-code-format-template.xml | 2 +- .../carbondata/hadoop/CarbonInputFormat.java | 2 +- .../carbondata/hadoop/CarbonRecordReader.java | 8 +- integration/hive/hive-guide.md | 106 +++++++ integration/hive/pom.xml | 114 ++++++++ .../carbondata/hive/CarbonArrayInspector.java | 192 ++++++++++++ .../carbondata/hive/CarbonHiveInputSplit.java | 290 +++++++++++++++++++ .../carbondata/hive/CarbonHiveRecordReader.java | 249 ++++++++++++++++ .../apache/carbondata/hive/CarbonHiveSerDe.java | 231 +++++++++++++++ .../carbondata/hive/CarbonObjectInspector.java | 221 ++++++++++++++ .../hive/CarbonStorageFormatDescriptor.java | 47 +++ .../hive/MapredCarbonInputFormat.java | 99 +++++++ .../hive/MapredCarbonOutputFormat.java | 49 ++++ ...he.hadoop.hive.ql.io.StorageFormatDescriptor | 1 + .../apache/carbondata/hive/TestCarbonSerde.java | 133 +++++++++ pom.xml | 17 ++ 16 files changed, 1755 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/dev/java-code-format-template.xml ---------------------------------------------------------------------- diff --git a/dev/java-code-format-template.xml b/dev/java-code-format-template.xml index d117313..b39ef1e 100644 --- a/dev/java-code-format-template.xml +++ b/dev/java-code-format-template.xml @@ -34,8 +34,8 @@ <option name="IMPORT_LAYOUT_TABLE"> <value> <emptyLine /> - <package name="javax" withSubpackages="true" static="false" /> <package name="java" withSubpackages="true" static="false" /> + <package name="javax" withSubpackages="true" static="false" /> <emptyLine /> <package name="org.apache.carbondata" withSubpackages="true" static="false" /> <emptyLine /> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index b330f12..40f5f2c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -333,7 +333,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return result; } - private Expression getFilterPredicates(Configuration configuration) { + protected Expression getFilterPredicates(Configuration configuration) { try { String filterExprString = configuration.get(FILTER_PREDICATE); if (filterExprString == null) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 27c8b2f..702ae26 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -41,13 +41,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; */ public class CarbonRecordReader<T> extends RecordReader<Void, T> { - private QueryModel queryModel; + protected QueryModel queryModel; - private CarbonReadSupport<T> readSupport; + protected CarbonReadSupport<T> readSupport; - private CarbonIterator<Object[]> carbonIterator; + protected CarbonIterator<Object[]> carbonIterator; - private QueryExecutor queryExecutor; + protected QueryExecutor queryExecutor; public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) { this.queryModel = queryModel; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/hive-guide.md ---------------------------------------------------------------------- diff --git a/integration/hive/hive-guide.md b/integration/hive/hive-guide.md new file mode 100644 index 0000000..202b2b2 --- /dev/null +++ b/integration/hive/hive-guide.md @@ -0,0 +1,106 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Quick Start +This tutorial provides a quick introduction to using current integration/hive module. + +## Prerequisites +## Spark Version 2.1 +* Build integration/hive +mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package -Phadoop-2.7.2 -Phive-1.2 + + +* Create a sample.csv file using the following commands. The CSV file is required for loading data into CarbonData. + + ``` + cd carbondata + cat > sample.csv << EOF + id,name,scale,country,salary + 1,yuhai,1.77,china,33000.1 + 2,runlin,1.70,china,33000.2 + EOF + ``` + $HADOOP_HOME/bin/hadoop fs -put sample.csv /user/hadoop/sample.csv + +## Create hive carbon table in spark shell + +Start Spark shell by running the following command in the Spark directory: + +``` +./bin/spark-shell --jars <carbondata assembly jar path, carbondata hive jar path> +``` + +``` +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.CarbonSession._ +val rootPath = "hdfs:////user/hadoop/carbon" +val storeLocation = s"$rootPath/store" +val warehouse = s"$rootPath/warehouse" +val metastoredb = s"$rootPath/metastore_db" + + val carbon = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", warehouse).config(org.apache.carbondata.core.constants.CarbonCommonConstants.STORE_LOCATION, storeLocation).getOrCreateCarbonSession(storeLocation, metastoredb) + +carbon.sql("create table hive_carbon(id int, name string, scale decimal, country string, salary double) STORED BY 'carbondata'") +carbon.sql("LOAD DATA INPATH 'hdfs://mycluster/user/hadoop/sample.csv' INTO TABLE hive_carbon") + +``` + +## Query Data from a Table + +``` +scala>carbon.sql("SELECT * FROM hive_carbon").show() +``` + +## Query Data in Hive + +### Configure hive classpath +``` +mkdir hive/auxlibs/ +cp incubator-carbondata/assembly/target/scala-2.11/carbondata_2.11*.jar hive/auxlibs/ +cp incubator-carbondata/integration/hive/target/carbondata-hive-*.jar hive/auxlibs/ +cp $SPARK_HOME/jars/spark-catalyst*.jar hive/auxlibs/ +export HIVE_AUX_JARS_PATH=hive/auxlibs/ +``` + +### Alter schema in Hive +$HIVE_HOME/bin/hive + +``` +alter table hive_carbon set FILEFORMAT +INPUTFORMAT "org.apache.carbondata.hive.MapredCarbonInputFormat" +OUTPUTFORMAT "org.apache.carbondata.hive.MapredCarbonOutputFormat" +SERDE "org.apache.carbondata.hive.CarbonHiveSerDe"; + +alter table hive_carbon set LOCATION 'hdfs://mycluster-tj/user/hadoop/carbon/store/default/hive_carbon'; +alter table hive_carbon change col id INT; +alter table hive_carbon add columns(name string, scale decimal(10, 2), country string, salary double); + +``` + +### Query data from hive table +``` +set hive.mapred.supports.subdirectories=true; +set mapreduce.input.fileinputformat.input.dir.recursive=true; + +select * from hive_carbon; + +select * from hive_carbon order by id; +``` + + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/pom.xml ---------------------------------------------------------------------- diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml new file mode 100644 index 0000000..714245e --- /dev/null +++ b/integration/hive/pom.xml @@ -0,0 +1,114 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.1.0-incubating-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-hive</artifactId> + <name>Apache CarbonData :: Hive</name> + + <properties> + <hive.version>1.2.1</hive.version> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-ant</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>spark-client</artifactId> + </exclusion> + <exclusion> + <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware.kryo</groupId> + </exclusion> + </exclusions> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-hadoop</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/resources</directory> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <includes> + <include>**/Test*.java</include> + <include>**/*Test.java</include> + <include>**/*TestCase.java</include> + <include>**/*Suite.java</include> + </includes> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java new file mode 100644 index 0000000..424dc5a --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; + +/** + * The CarbonHiveArrayInspector will inspect an ArrayWritable, considering it as an Hive array. + * It can also inspect a List if Hive decides to inspect the result of an inspection. + */ +public class CarbonArrayInspector implements SettableListObjectInspector { + + private ObjectInspector arrayElementInspector; + + public CarbonArrayInspector(final ObjectInspector arrayElementInspector) { + this.arrayElementInspector = arrayElementInspector; + } + + @Override + public String getTypeName() { + return "array<" + arrayElementInspector.getTypeName() + ">"; + } + + @Override + public Category getCategory() { + return Category.LIST; + } + + @Override + public ObjectInspector getListElementObjectInspector() { + return arrayElementInspector; + } + + @Override + public Object getListElement(final Object data, final int index) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); + + if (listContainer == null || listContainer.length == 0) { + return null; + } + + final Writable subObj = listContainer[0]; + + if (subObj == null) { + return null; + } + + if (index >= 0 && index < ((ArrayWritable) subObj).get().length) { + return ((ArrayWritable) subObj).get()[index]; + } else { + return null; + } + } + + throw new UnsupportedOperationException("Cannot inspect " + + data.getClass().getCanonicalName()); + } + + @Override + public int getListLength(final Object data) { + if (data == null) { + return -1; + } + + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); + + if (listContainer == null || listContainer.length == 0) { + return -1; + } + + final Writable subObj = listContainer[0]; + + if (subObj == null) { + return 0; + } + + return ((ArrayWritable) subObj).get().length; + } + + throw new UnsupportedOperationException("Cannot inspect " + + data.getClass().getCanonicalName()); + } + + @Override + public List<?> getList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); + + if (listContainer == null || listContainer.length == 0) { + return null; + } + + final Writable subObj = listContainer[0]; + + if (subObj == null) { + return null; + } + + final Writable[] array = ((ArrayWritable) subObj).get(); + final List<Writable> list = Arrays.asList(array); + + for (final Writable obj : array) { + list.add(obj); + } + + return list; + } + + throw new UnsupportedOperationException("Cannot inspect " + + data.getClass().getCanonicalName()); + } + + @Override + public Object create(final int size) { + final List<Object> result = Arrays.asList(new Object[size]); + for (int i = 0; i < size; ++i) { + result.add(null); + } + return result; + } + + @Override + public Object set(final Object list, final int index, final Object element) { + final ArrayList<Object> l = (ArrayList<Object>) list; + l.set(index, element); + return list; + } + + @Override + public Object resize(final Object list, final int newSize) { + final ArrayList<Object> l = (ArrayList<Object>) list; + l.ensureCapacity(newSize); + while (l.size() < newSize) { + l.add(null); + } + while (l.size() > newSize) { + l.remove(l.size() - 1); + } + return list; + } + + @Override + public boolean equals(final Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + final ObjectInspector other = ((CarbonArrayInspector) o).arrayElementInspector; + return other.equals(arrayElementInspector); + } + } + + @Override + public int hashCode() { + int hash = 3; + hash = 29 * hash + (this.arrayElementInspector != null ? + this.arrayElementInspector.hashCode() : 0); + return hash; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java new file mode 100644 index 0000000..16b859b --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.BlockletInfos; +import org.apache.carbondata.core.datastore.block.Distributable; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.internal.index.Block; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; + +public class CarbonHiveInputSplit extends FileSplit + implements Distributable, Serializable, Writable, Block { + + private static final long serialVersionUID = 3520344046772190208L; + private String taskId; + + private String segmentId; + + private String bucketId; + /* + * Invalid segments that need to be removed in task side index + */ + private List<String> invalidSegments; + + /* + * Number of BlockLets in a block + */ + private int numberOfBlocklets; + + private ColumnarFormatVersion version; + + /** + * map of blocklocation and storage id + */ + private Map<String, String> blockStorageIdMap = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + private List<UpdateVO> invalidTimestampsList; + + public CarbonHiveInputSplit() { + segmentId = null; + taskId = "0"; + bucketId = "0"; + numberOfBlocklets = 0; + invalidSegments = new ArrayList<>(); + version = CarbonProperties.getInstance().getFormatVersion(); + } + + public CarbonHiveInputSplit(String segmentId, Path path, long start, long length, + String[] locations, ColumnarFormatVersion version) { + super(path, start, length, locations); + this.segmentId = segmentId; + this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName()); + this.invalidSegments = new ArrayList<>(); + this.version = version; + } + + public CarbonHiveInputSplit(String segmentId, Path path, long start, long length, + String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) { + this(segmentId, path, start, length, locations, version); + this.numberOfBlocklets = numberOfBlocklets; + } + + /** + * Constructor to initialize the CarbonInputSplit with blockStorageIdMap + * + * @param segmentId + * @param path + * @param start + * @param length + * @param locations + * @param numberOfBlocklets + * @param version + * @param blockStorageIdMap + */ + public CarbonHiveInputSplit(String segmentId, Path path, long start, long length, + String[] locations, int numberOfBlocklets, ColumnarFormatVersion version, + Map<String, String> blockStorageIdMap) { + this(segmentId, path, start, length, locations, numberOfBlocklets, version); + this.blockStorageIdMap = blockStorageIdMap; + } + + public static CarbonHiveInputSplit from(String segmentId, FileSplit split, + ColumnarFormatVersion version) + throws IOException { + return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(), + split.getLocations(), version); + } + + public static List<TableBlockInfo> createBlocks(List<CarbonHiveInputSplit> splitList) { + List<TableBlockInfo> tableBlockInfoList = new ArrayList<>(); + for (CarbonHiveInputSplit split : splitList) { + BlockletInfos blockletInfos = + new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets()); + try { + tableBlockInfoList.add( + new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(), + split.getLocations(), split.getLength(), blockletInfos, split.getVersion())); + } catch (IOException e) { + throw new RuntimeException("fail to get location of split: " + split, e); + } + } + return tableBlockInfoList; + } + + public static TableBlockInfo getTableBlockInfo(CarbonHiveInputSplit inputSplit) { + BlockletInfos blockletInfos = + new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets()); + try { + return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), + inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(), + blockletInfos, inputSplit.getVersion()); + } catch (IOException e) { + throw new RuntimeException("fail to get location of split: " + inputSplit, e); + } + } + + public String getSegmentId() { + return segmentId; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + this.segmentId = in.readUTF(); + this.version = ColumnarFormatVersion.valueOf(in.readShort()); + this.bucketId = in.readUTF(); + int numInvalidSegment = in.readInt(); + invalidSegments = new ArrayList<>(numInvalidSegment); + for (int i = 0; i < numInvalidSegment; i++) { + invalidSegments.add(in.readUTF()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(segmentId); + out.writeShort(version.number()); + out.writeUTF(bucketId); + out.writeInt(invalidSegments.size()); + for (String invalidSegment : invalidSegments) { + out.writeUTF(invalidSegment); + } + } + + public List<String> getInvalidSegments() { + return invalidSegments; + } + + public void setInvalidSegments(List<String> invalidSegments) { + this.invalidSegments = invalidSegments; + } + + public void setInvalidTimestampRange(List<UpdateVO> invalidTimestamps) { + invalidTimestampsList = invalidTimestamps; + } + + public List<UpdateVO> getInvalidTimestampRange() { + return invalidTimestampsList; + } + + /** + * returns the number of blocklets + * + * @return + */ + public int getNumberOfBlocklets() { + return numberOfBlocklets; + } + + public ColumnarFormatVersion getVersion() { + return version; + } + + public void setVersion(ColumnarFormatVersion version) { + this.version = version; + } + + public String getBucketId() { + return bucketId; + } + + @Override + public int compareTo(Distributable o) { + if (o == null) { + return -1; + } + CarbonHiveInputSplit other = (CarbonHiveInputSplit) o; + int compareResult = 0; + // get the segment id + // converr seg ID to double. + + double seg1 = Double.parseDouble(segmentId); + double seg2 = Double.parseDouble(other.getSegmentId()); + if (seg1 - seg2 < 0) { + return -1; + } + if (seg1 - seg2 > 0) { + return 1; + } + + // Comparing the time task id of the file to other + // if both the task id of the file is same then we need to compare the + // offset of + // the file + String filePath1 = this.getPath().getName(); + String filePath2 = other.getPath().getName(); + if (CarbonTablePath.isCarbonDataFile(filePath1)) { + int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1)); + int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2)); + if (firstTaskId != otherTaskId) { + return firstTaskId - otherTaskId; + } + + int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1)); + int otherBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath2)); + if (firstBucketNo != otherBucketNo) { + return firstBucketNo - otherBucketNo; + } + + // compare the part no of both block info + int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1)); + int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2)); + compareResult = firstPartNo - SecondPartNo; + } else { + compareResult = filePath1.compareTo(filePath2); + } + if (compareResult != 0) { + return compareResult; + } + return 0; + } + + @Override + public String getBlockPath() { + return getPath().getName(); + } + + @Override + public List<Long> getMatchedBlocklets() { + return null; + } + + @Override + public boolean fullScan() { + return true; + } + + /** + * returns map of blocklocation and storage id + * + * @return + */ + public Map<String, String> getBlockStorageIdMap() { + return blockStorageIdMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java new file mode 100644 index 0000000..ba29028 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + + +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable> + implements org.apache.hadoop.mapred.RecordReader<Void, ArrayWritable> { + + ArrayWritable valueObj = null; + private CarbonObjectInspector objInspector; + + public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport, + InputSplit inputSplit, JobConf jobConf) throws IOException { + super(queryModel, readSupport); + initialize(inputSplit, jobConf); + } + + public void initialize(InputSplit inputSplit, Configuration conf) throws IOException { + // The input split can contain single HDFS block or multiple blocks, so firstly get all the + // blocks and then set them in the query model. + List<CarbonHiveInputSplit> splitList; + if (inputSplit instanceof CarbonHiveInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonHiveInputSplit) inputSplit); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonHiveInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + readSupport.initialize(queryModel.getProjectionColumns(), + queryModel.getAbsoluteTableIdentifier()); + try { + carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); + } catch (QueryExecutionException e) { + throw new IOException(e.getMessage(), e.getCause()); + } + if (valueObj == null) { + valueObj = new ArrayWritable(Writable.class, + new Writable[queryModel.getProjectionColumns().length]); + } + + final TypeInfo rowTypeInfo; + final List<String> columnNames; + List<TypeInfo> columnTypes; + // Get column names and sort order + final String columnNameProperty = conf.get("hive.io.file.readcolumn.names"); + final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); + + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + columnTypes = columnTypes.subList(0, columnNames.size()); + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); + } + + @Override + public boolean next(Void aVoid, ArrayWritable value) throws IOException { + if (carbonIterator.hasNext()) { + Object obj = readSupport.readRow(carbonIterator.next()); + ArrayWritable tmpValue = null; + try { + tmpValue = createArrayWritable(obj); + } catch (SerDeException se) { + throw new IOException(se.getMessage(), se.getCause()); + } + + if (value != tmpValue) { + final Writable[] arrValue = value.get(); + final Writable[] arrCurrent = tmpValue.get(); + if (valueObj != null && arrValue.length == arrCurrent.length) { + System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length); + } else { + if (arrValue.length != arrCurrent.length) { + throw new IOException("CarbonHiveInput : size of object differs. Value" + + " size : " + arrValue.length + ", Current Object size : " + arrCurrent.length); + } else { + throw new IOException("CarbonHiveInput can not support RecordReaders that" + + " don't return same key & value & value is null"); + } + } + } + return true; + } else { + return false; + } + } + + public ArrayWritable createArrayWritable(Object obj) throws SerDeException { + return createStruct(obj, objInspector); + } + + @Override + public Void createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return valueObj; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + public ArrayWritable createStruct(Object obj, StructObjectInspector inspector) + throws SerDeException { + List fields = inspector.getAllStructFieldRefs(); + Writable[] arr = new Writable[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + StructField field = (StructField) fields.get(i); + Object subObj = inspector.getStructFieldData(obj, field); + ObjectInspector subInspector = field.getFieldObjectInspector(); + arr[i] = createObject(subObj, subInspector); + } + return new ArrayWritable(Writable.class, arr); + } + + private ArrayWritable createArray(Object obj, ListObjectInspector inspector) + throws SerDeException { + List sourceArray = inspector.getList(obj); + ObjectInspector subInspector = inspector.getListElementObjectInspector(); + List array = new ArrayList(); + Iterator iterator; + if (sourceArray != null) { + for (iterator = sourceArray.iterator(); iterator.hasNext(); ) { + Object curObj = iterator.next(); + Writable newObj = createObject(curObj, subInspector); + if (newObj != null) { + array.add(newObj); + } + } + } + if (array.size() > 0) { + ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(), + (Writable[]) array.toArray(new Writable[array.size()])); + + return new ArrayWritable(Writable.class, new Writable[]{subArray}); + } + return null; + } + + private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector) + throws SerDeException { + if (obj == null) { + return null; + } + switch (inspector.getPrimitiveCategory()) { + case VOID: + return null; + case DOUBLE: + return new DoubleWritable((double) obj); + case INT: + return new IntWritable(((Long) obj).intValue()); + case LONG: + return new LongWritable((long) obj); + case SHORT: + return new ShortWritable(((Long) obj).shortValue()); + case DATE: + return new DateWritable(new Date(((long) obj))); + case TIMESTAMP: + return new TimestampWritable(new Timestamp((long) obj)); + case STRING: + return new Text(obj.toString()); + case DECIMAL: + return new HiveDecimalWritable(HiveDecimal.create( + ((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal())); + } + throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory()); + } + + private Writable createObject(Object obj, ObjectInspector inspector) throws SerDeException { + switch (inspector.getCategory()) { + case STRUCT: + return createStruct(obj, (StructObjectInspector) inspector); + case LIST: + return createArray(obj, (ListObjectInspector) inspector); + case PRIMITIVE: + return createPrimitive(obj, (PrimitiveObjectInspector) inspector); + } + throw new SerDeException("Unknown data type" + inspector.getCategory()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java new file mode 100644 index 0000000..cbc2514 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + + +/** + * A serde class for Carbondata. + * It transparently passes the object to/from the Carbon file reader/writer. + */ +@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES}) +public class CarbonHiveSerDe extends AbstractSerDe { + private SerDeStats stats; + private ObjectInspector objInspector; + + private enum LAST_OPERATION { + SERIALIZE, + DESERIALIZE, + UNKNOWN + } + + private LAST_OPERATION status; + private long serializedSize; + private long deserializedSize; + + public CarbonHiveSerDe() { + stats = new SerDeStats(); + } + + @Override + public void initialize(@Nullable Configuration configuration, Properties tbl) + throws SerDeException { + + final TypeInfo rowTypeInfo; + final List<String> columnNames; + final List<TypeInfo> columnTypes; + // Get column names and sort order + final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); + + // Stats part + serializedSize = 0; + deserializedSize = 0; + status = LAST_OPERATION.UNKNOWN; + } + + @Override + public Class<? extends Writable> getSerializedClass() { + return ArrayWritable.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException { + if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) { + throw new SerDeException("Cannot serialize " + objInspector.getCategory() + + ". Can only serialize a struct"); + } + serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size(); + status = LAST_OPERATION.SERIALIZE; + return createStruct(obj, (StructObjectInspector) objInspector); + } + + public ArrayWritable createStruct(Object obj, StructObjectInspector inspector) + throws SerDeException { + List fields = inspector.getAllStructFieldRefs(); + Writable[] arr = new Writable[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + StructField field = (StructField) fields.get(i); + Object subObj = inspector.getStructFieldData(obj, field); + ObjectInspector subInspector = field.getFieldObjectInspector(); + arr[i] = createObject(subObj, subInspector); + } + return new ArrayWritable(Writable.class, arr); + } + + private ArrayWritable createArray(Object obj, ListObjectInspector inspector) + throws SerDeException { + List sourceArray = inspector.getList(obj); + ObjectInspector subInspector = inspector.getListElementObjectInspector(); + List array = new ArrayList(); + Iterator iterator; + if (sourceArray != null) { + for (iterator = sourceArray.iterator(); iterator.hasNext(); ) { + Object curObj = iterator.next(); + Writable newObj = createObject(curObj, subInspector); + if (newObj != null) { + array.add(newObj); + } + } + } + if (array.size() > 0) { + ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(), + (Writable[]) array.toArray(new Writable[array.size()])); + + return new ArrayWritable(Writable.class, new Writable[]{subArray}); + } + return null; + } + + private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector) + throws SerDeException { + if (obj == null) { + return null; + } + switch (inspector.getPrimitiveCategory()) { + case VOID: + return null; + case DOUBLE: + return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj)); + case INT: + return new IntWritable(((IntObjectInspector) inspector).get(obj)); + case LONG: + return new LongWritable(((LongObjectInspector) inspector).get(obj)); + case SHORT: + return new ShortWritable(((ShortObjectInspector) inspector).get(obj)); + case TIMESTAMP: + return ((TimestampObjectInspector) inspector).getPrimitiveWritableObject(obj); + case DATE: + return ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj); + case STRING: + return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj); + case DECIMAL: + return ((HiveDecimalObjectInspector) inspector).getPrimitiveWritableObject(obj); + } + throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory()); + } + + private Writable createObject(Object obj, ObjectInspector inspector) throws SerDeException { + switch (inspector.getCategory()) { + case STRUCT: + return createStruct(obj, (StructObjectInspector) inspector); + case LIST: + return createArray(obj, (ListObjectInspector) inspector); + case PRIMITIVE: + return createPrimitive(obj, (PrimitiveObjectInspector) inspector); + } + throw new SerDeException("Unknown data type" + inspector.getCategory()); + } + + @Override + public SerDeStats getSerDeStats() { + // must be different + assert (status != LAST_OPERATION.UNKNOWN); + if (status == LAST_OPERATION.SERIALIZE) { + stats.setRawDataSize(serializedSize); + } else { + stats.setRawDataSize(deserializedSize); + } + return stats; + } + + @Override + public Object deserialize(Writable writable) throws SerDeException { + status = LAST_OPERATION.DESERIALIZE; + if (writable instanceof ArrayWritable) { + deserializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size(); + return writable; + } else { + return null; + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return objInspector; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java new file mode 100644 index 0000000..f6ab256 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.ArrayWritable; + +public class CarbonObjectInspector extends SettableStructObjectInspector { + private final TypeInfo typeInfo; + private final List<TypeInfo> fieldInfos; + private final List<String> fieldNames; + private final List<StructField> fields; + private final HashMap<String, StructFieldImpl> fieldsByName; + + public CarbonObjectInspector(final StructTypeInfo rowTypeInfo) { + + typeInfo = rowTypeInfo; + fieldNames = rowTypeInfo.getAllStructFieldNames(); + fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<StructField>(fieldNames.size()); + fieldsByName = new HashMap<String, StructFieldImpl>(); + + for (int i = 0; i < fieldNames.size(); ++i) { + final String name = fieldNames.get(i); + final TypeInfo fieldInfo = fieldInfos.get(i); + + final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); + fields.add(field); + fieldsByName.put(name, field); + } + } + + public ObjectInspector getObjectInspector(final TypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableIntObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + } else if (typeInfo instanceof DecimalTypeInfo) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + (DecimalTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.STRUCT)) { + return new CarbonObjectInspector((StructTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.LIST)) { + final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return new CarbonArrayInspector(getObjectInspector(subTypeInfo)); + } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableShortObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + } else { + throw new UnsupportedOperationException("Unknown field type: " + typeInfo); + } + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public String getTypeName() { + return typeInfo.getTypeName(); + } + + @Override + public List<? extends StructField> getAllStructFieldRefs() { + return fields; + } + + @Override + public Object getStructFieldData(final Object data, final StructField fieldRef) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final ArrayWritable arr = (ArrayWritable) data; + return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; + } + + boolean isArray = !(data instanceof List); + if (!isArray && !(data instanceof List)) { + return data; + } else { + int listSize = isArray ? ((Object[]) ((Object[]) data)).length : ((List) data).size(); + int fieldID = fieldRef.getFieldID(); + return fieldID >= listSize ? null : + (isArray ? ((Object[]) ((Object[]) data))[fieldID] : ((List) data).get(fieldID)); + } + } + + @Override + public StructField getStructFieldRef(final String name) { + return fieldsByName.get(name); + } + + @Override + public List<Object> getStructFieldsDataAsList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final ArrayWritable arr = (ArrayWritable) data; + final Object[] arrWritable = arr.get(); + return new ArrayList<Object>(Arrays.asList(arrWritable)); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + final ArrayList<Object> list = new ArrayList<Object>(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + list.add(null); + } + return list; + } + + @Override + public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { + final ArrayList<Object> list = (ArrayList<Object>) struct; + list.set(((StructFieldImpl) field).getIndex(), fieldValue); + return list; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final CarbonObjectInspector other = (CarbonObjectInspector) obj; + if (this.typeInfo != other.typeInfo && + (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 5; + hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); + return hash; + } + + class StructFieldImpl implements StructField { + + private final String name; + private final ObjectInspector inspector; + private final int index; + + public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { + this.name = name; + this.inspector = inspector; + this.index = index; + } + + @Override + public String getFieldComment() { + return ""; + } + + @Override + public String getFieldName() { + return name; + } + + public int getIndex() { + return index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return index; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java new file mode 100644 index 0000000..f25342d --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hive; + +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hive.ql.io.AbstractStorageFormatDescriptor; + +public class CarbonStorageFormatDescriptor extends AbstractStorageFormatDescriptor { + + @Override + public Set<String> getNames() { + return ImmutableSet.of("CARBONDATA"); + } + + @Override + public String getInputFormat() { + return MapredCarbonInputFormat.class.getName(); + } + + @Override + public String getOutputFormat() { + return MapredCarbonOutputFormat.class.getName(); + } + + @Override + public String getSerde() { + return CarbonHiveSerDe.class.getName(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java new file mode 100644 index 0000000..5caf5a8 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.hadoop.CarbonInputFormat; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; + + +public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> + implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination { + + @Override + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); + List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext); + InputSplit[] splits = new InputSplit[splitList.size()]; + CarbonInputSplit split = null; + for (int i = 0; i < splitList.size(); i++) { + split = (CarbonInputSplit) splitList.get(i); + splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), + split.getStart(), split.getLength(), split.getLocations(), + split.getNumberOfBlocklets(), split.getVersion(), split.getBlockStorageIdMap()); + } + return splits; + } + + @Override + public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, + Reporter reporter) throws IOException { + QueryModel queryModel = getQueryModel(jobConf); + CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf); + return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf); + } + + public QueryModel getQueryModel(Configuration configuration) throws IOException { + CarbonTable carbonTable = getCarbonTable(configuration); + // getting the table absoluteTableIdentifier from the carbonTable + // to avoid unnecessary deserialization + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + + // query plan includes projection column + + String projection = getColumnProjection(configuration); + if (projection == null) { + projection = configuration.get("hive.io.file.readcolumn.names"); + } + CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); + QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable); + + // set the filter to the query model in order to filter blocklet before scan + Expression filter = getFilterPredicates(configuration); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + queryModel.setFilterExpressionResolverTree(filterIntf); + + return queryModel; + } + + @Override + public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java new file mode 100644 index 0000000..83fd2d5 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.util.Progressable; + + +public class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T> + implements HiveOutputFormat<Void, T> { + + @Override + public RecordWriter<Void, T> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s, + Progressable progressable) throws IOException { + return null; + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties, + Progressable progress) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor b/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor new file mode 100644 index 0000000..9a39d21 --- /dev/null +++ b/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor @@ -0,0 +1 @@ +org.apache.hadoop.hive.ql.io.CarbonStorageFormatDescriptor \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java new file mode 100644 index 0000000..3969914 --- /dev/null +++ b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.*; +import org.junit.Test; + +import java.util.Properties; + +public class TestCarbonSerde extends TestCase { + @Test + public void testCarbonHiveSerDe() throws Throwable { + try { + // Create the SerDe + System.out.println("test: testCarbonHiveSerDe"); + + final CarbonHiveSerDe serDe = new CarbonHiveSerDe(); + final Configuration conf = new Configuration(); + final Properties tbl = createProperties(); + SerDeUtils.initializeSerDe(serDe, conf, tbl, null); + + // Data + final Writable[] arr = new Writable[7]; + + //primitive types + arr[0] = new ShortWritable((short) 456); + arr[1] = new IntWritable(789); + arr[2] = new LongWritable(1000l); + arr[3] = new DoubleWritable((double) 5.3); + arr[4] = new HiveDecimalWritable(HiveDecimal.create(1)); + arr[5] = new Text("carbonSerde binary".getBytes("UTF-8")); + + final Writable[] arrayContainer = new Writable[1]; + final Writable[] array = new Writable[5]; + for (int i = 0; i < 5; ++i) { + array[i] = new IntWritable(i); + } + arrayContainer[0] = new ArrayWritable(Writable.class, array); + arr[6] = new ArrayWritable(Writable.class, arrayContainer); + + final ArrayWritable arrWritable = new ArrayWritable(Writable.class, arr); + // Test + deserializeAndSerializeLazySimple(serDe, arrWritable); + System.out.println("test: testCarbonHiveSerDe - OK"); + + } catch (final Throwable e) { + e.printStackTrace(); + throw e; + } + } + + private void deserializeAndSerializeLazySimple(final CarbonHiveSerDe serDe, + final ArrayWritable t) throws SerDeException { + + // Get the row structure + final StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); + + // Deserialize + final Object row = serDe.deserialize(t); + assertEquals("deserialization gives the wrong object class", row.getClass(), + ArrayWritable.class); + assertEquals("size correct after deserialization", + serDe.getSerDeStats().getRawDataSize(), t.get().length); + assertEquals("deserialization gives the wrong object", t, row); + + // Serialize + final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi); + assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), + serializedArr.get().length); + assertTrue("serialized object should be equal to starting object", + arrayWritableEquals(t, serializedArr)); + } + + private Properties createProperties() { + final Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty("columns", "ashort,aint,along,adouble,adecimal,astring,alist"); + tbl.setProperty("columns.types", + "smallint:int:bigint:double:decimal:string:array<int>"); + tbl.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL"); + return tbl; + } + + public static boolean arrayWritableEquals(final ArrayWritable a1, final ArrayWritable a2) { + final Writable[] a1Arr = a1.get(); + final Writable[] a2Arr = a2.get(); + + if (a1Arr.length != a2Arr.length) { + return false; + } + + for (int i = 0; i < a1Arr.length; ++i) { + if (a1Arr[i] instanceof ArrayWritable) { + if (!(a2Arr[i] instanceof ArrayWritable)) { + return false; + } + if (!arrayWritableEquals((ArrayWritable) a1Arr[i], (ArrayWritable) a2Arr[i])) { + return false; + } + } else { + if (!a1Arr[i].equals(a2Arr[i])) { + return false; + } + } + + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/96aeee85/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a132d6d..6005c0f 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,12 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + <scope>${hadoop.deps.scope}</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <scope>${hadoop.deps.scope}</scope> @@ -305,6 +311,7 @@ <scala.binary.version>2.10</scala.binary.version> <scala.version>2.10.4</scala.version> <maven.test.skip>true</maven.test.skip> + <hive.version>1.2.1</hive.version> <flink.version>1.1.4</flink.version> </properties> <modules> @@ -312,6 +319,7 @@ <module>integration/spark</module> <module>examples/spark</module> <module>integration/spark2</module> + <module>integration/hive</module> <module>examples/spark2</module> <module>examples/flink</module> </modules> @@ -379,6 +387,15 @@ </modules> </profile> <profile> + <id>hive-1.2</id> + <properties> + <hive.version>1.2.1</hive.version> + </properties> + <modules> + <module>integration/hive</module> + </modules> + </profile> + <profile> <id>findbugs</id> <build> <plugins>