[ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324499#comment-14324499 ]
ASF GitHub Bot commented on FLINK-1466: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/411#discussion_r24833839 --- Diff: flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java --- @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hcatalog; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.WritableTypeInfo; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * A InputFormat to read from HCatalog tables. + * The InputFormat supports projection (selection and order of fields) and partition filters. + * + * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. + * Flink Tuples are only supported for primitive type fields + * (no STRUCT, ARRAY, or MAP data types) and have a size limitation. + * + * @param <T> + */ +public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInputSplit>, ResultTypeQueryable<T> { + + private static final long serialVersionUID = 1L; + + private Configuration configuration; + + private org.apache.hive.hcatalog.mapreduce.HCatInputFormat hCatInputFormat; + private RecordReader<WritableComparable, HCatRecord> recordReader; + private boolean fetched = false; + private boolean hasNext; + + protected String[] fieldNames = new String[0]; + protected HCatSchema outputSchema; + + private TypeInformation<T> resultType; + + public HCatInputFormatBase() { } + + /** + * Creates a HCatInputFormat for the given database and table. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table) throws IOException { + this(database, table, new Configuration()); + } + + /** + * Creates a HCatInputFormat for the given database, table, and + * {@link org.apache.hadoop.conf.Configuration}. + * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. + * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * {@link HCatInputFormatBase#asFlinkTuples()}. + * + * @param database The name of the database to read from. + * @param table The name of the table to read. + * @param config The Configuration for the InputFormat. + * @throws java.io.IOException + */ + public HCatInputFormatBase(String database, String table, Configuration config) throws IOException { + super(); + this.configuration = config; + HadoopUtils.mergeHadoopConf(this.configuration); + + this.hCatInputFormat = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, database, table); + this.outputSchema = org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration); + + // configure output schema of HCatFormat + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + // set type information + this.resultType = new WritableTypeInfo(DefaultHCatRecord.class); + } + + /** + * Specifies the fields which are returned by the InputFormat and their order. + * + * @param fields The fields and their order which are returned by the InputFormat. + * @return This InputFormat with specified return fields. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> getFields(String... fields) throws IOException { + + // build output schema + ArrayList<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(fields.length); + for(String field : fields) { + fieldSchemas.add(this.outputSchema.get(field)); + } + this.outputSchema = new HCatSchema(fieldSchemas); + + // update output schema configuration + configuration.set("mapreduce.lib.hcat.output.schema", HCatUtil.serialize(outputSchema)); + + return this; + } + + /** + * Specifies a SQL-like filter condition on the table's partition columns. + * Filter conditions on non-partition columns are invalid. + * A partition filter can significantly reduce the amount of data to be read. + * + * @param filter A SQL-like filter condition on the table's partition columns. + * @return This InputFormat with specified partition filter. + * @throws java.io.IOException + */ + public HCatInputFormatBase<T> withFilter(String filter) throws IOException { + + // set filter + this.hCatInputFormat.setFilter(filter); + + return this; + } + + /** + * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple} + * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}. + * At the moment, the following restrictions apply for returning Flink tuples: + * + * <ul> + * <li>Only primitive type fields can be returned in Flink Tuples + * (no STRUCT, MAP, ARRAY data types).</li> + * <li>Only a limited number of fields can be returned as Flink Tuple.</li> + * </ul> + * + * @return This InputFormat. + * @throws org.apache.hive.hcatalog.common.HCatException + */ + public HCatInputFormatBase<T> asFlinkTuples() throws HCatException { + + // build type information + int numFields = outputSchema.getFields().size(); + if(numFields > this.getMaxFlinkTupleSize()) { + throw new IllegalArgumentException("Only up to "+this.getMaxFlinkTupleSize()+ + " fields can be returned as Flink tuples."); + } + + TypeInformation[] fieldTypes = new TypeInformation[numFields]; + fieldNames = new String[numFields]; + for (String fieldName : outputSchema.getFieldNames()) { + HCatFieldSchema field = outputSchema.get(fieldName); + + int fieldPos = outputSchema.getPosition(fieldName); + TypeInformation fieldType = getFieldType(field); + + fieldTypes[fieldPos] = fieldType; + fieldNames[fieldPos] = fieldName; + + } + this.resultType = new TupleTypeInfo(fieldTypes); + + return this; + } + + protected abstract int getMaxFlinkTupleSize(); + + private TypeInformation getFieldType(HCatFieldSchema fieldSchema) { + + switch(fieldSchema.getType()) { + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case TINYINT: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SMALLINT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case BIGINT: + return BasicTypeInfo.LONG_TYPE_INFO; + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case STRING: + return BasicTypeInfo.STRING_TYPE_INFO; + case BINARY: + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + case ARRAY: + throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples, yet."); --- End diff -- Why can't we use genericTypes here for arrays, hashmps, lists (see https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput) > Add InputFormat to read HCatalog tables > --------------------------------------- > > Key: FLINK-1466 > URL: https://issues.apache.org/jira/browse/FLINK-1466 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Fabian Hueske > Assignee: Fabian Hueske > Priority: Minor > > HCatalog is a metadata repository and InputFormat to make Hive tables > accessible to other frameworks such as Pig. > Adding support for HCatalog would give access to Hive managed data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)