[ 
https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324499#comment-14324499
 ] 

ASF GitHub Bot commented on FLINK-1466:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/411#discussion_r24833839
  
    --- Diff: 
flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 ---
    @@ -0,0 +1,413 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.hcatalog;
    +
    +import org.apache.flink.api.common.io.InputFormat;
    +import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
    +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.WritableTypeInfo;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.WritableComparable;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.JobContext;
    +import org.apache.hadoop.mapreduce.JobID;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.TaskAttemptID;
    +import org.apache.hive.hcatalog.common.HCatException;
    +import org.apache.hive.hcatalog.common.HCatUtil;
    +import org.apache.hive.hcatalog.data.DefaultHCatRecord;
    +import org.apache.hive.hcatalog.data.HCatRecord;
    +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
    +import org.apache.hive.hcatalog.data.schema.HCatSchema;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * A InputFormat to read from HCatalog tables.
    + * The InputFormat supports projection (selection and order of fields) and 
partition filters.
    + *
    + * Data can be returned as {@link 
org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
    + * Flink Tuples are only supported for primitive type fields
    + * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
    + *
    + * @param <T>
    + */
    +public abstract class HCatInputFormatBase<T> implements InputFormat<T, 
HadoopInputSplit>, ResultTypeQueryable<T> {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   private Configuration configuration;
    +
    +   private org.apache.hive.hcatalog.mapreduce.HCatInputFormat 
hCatInputFormat;
    +   private RecordReader<WritableComparable, HCatRecord> recordReader;
    +   private boolean fetched = false;
    +   private boolean hasNext;
    +
    +   protected String[] fieldNames = new String[0];
    +   protected HCatSchema outputSchema;
    +
    +   private TypeInformation<T> resultType;
    +
    +   public HCatInputFormatBase() { }
    +
    +   /**
    +    * Creates a HCatInputFormat for the given database and table.
    +    * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
    +    * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
    +    * {@link HCatInputFormatBase#asFlinkTuples()}.
    +    *
    +    * @param database The name of the database to read from.
    +    * @param table The name of the table to read.
    +    * @throws java.io.IOException
    +    */
    +   public HCatInputFormatBase(String database, String table) throws 
IOException {
    +           this(database, table, new Configuration());
    +   }
    +
    +   /**
    +    * Creates a HCatInputFormat for the given database, table, and
    +    * {@link org.apache.hadoop.conf.Configuration}.
    +    * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
    +    * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
    +    * {@link HCatInputFormatBase#asFlinkTuples()}.
    +    *
    +    * @param database The name of the database to read from.
    +    * @param table The name of the table to read.
    +    * @param config The Configuration for the InputFormat.
    +    * @throws java.io.IOException
    +    */
    +   public HCatInputFormatBase(String database, String table, Configuration 
config) throws IOException {
    +           super();
    +           this.configuration = config;
    +           HadoopUtils.mergeHadoopConf(this.configuration);
    +
    +           this.hCatInputFormat = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, 
database, table);
    +           this.outputSchema = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
    +
    +           // configure output schema of HCatFormat
    +           configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
    +           // set type information
    +           this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
    +   }
    +
    +   /**
    +    * Specifies the fields which are returned by the InputFormat and their 
order.
    +    *
    +    * @param fields The fields and their order which are returned by the 
InputFormat.
    +    * @return This InputFormat with specified return fields.
    +    * @throws java.io.IOException
    +    */
    +   public HCatInputFormatBase<T> getFields(String... fields) throws 
IOException {
    +
    +           // build output schema
    +           ArrayList<HCatFieldSchema> fieldSchemas = new 
ArrayList<HCatFieldSchema>(fields.length);
    +           for(String field : fields) {
    +                   fieldSchemas.add(this.outputSchema.get(field));
    +           }
    +           this.outputSchema = new HCatSchema(fieldSchemas);
    +
    +           // update output schema configuration
    +           configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
    +
    +           return this;
    +   }
    +
    +   /**
    +    * Specifies a SQL-like filter condition on the table's partition 
columns.
    +    * Filter conditions on non-partition columns are invalid.
    +    * A partition filter can significantly reduce the amount of data to be 
read.
    +    *
    +    * @param filter A SQL-like filter condition on the table's partition 
columns.
    +    * @return This InputFormat with specified partition filter.
    +    * @throws java.io.IOException
    +    */
    +   public HCatInputFormatBase<T> withFilter(String filter) throws 
IOException {
    +
    +           // set filter
    +           this.hCatInputFormat.setFilter(filter);
    +
    +           return this;
    +   }
    +
    +   /**
    +    * Specifies that the InputFormat returns Flink {@link 
org.apache.flink.api.java.tuple.Tuple}
    +    * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
    +    * At the moment, the following restrictions apply for returning Flink 
tuples:
    +    *
    +    * <ul>
    +    *     <li>Only primitive type fields can be returned in Flink Tuples
    +    *          (no STRUCT, MAP, ARRAY data types).</li>
    +    *     <li>Only a limited number of fields can be returned as Flink 
Tuple.</li>
    +    * </ul>
    +    *
    +    * @return This InputFormat.
    +    * @throws org.apache.hive.hcatalog.common.HCatException
    +    */
    +   public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
    +
    +           // build type information
    +           int numFields = outputSchema.getFields().size();
    +           if(numFields > this.getMaxFlinkTupleSize()) {
    +                   throw new IllegalArgumentException("Only up to 
"+this.getMaxFlinkTupleSize()+
    +                                   " fields can be returned as Flink 
tuples.");
    +           }
    +
    +           TypeInformation[] fieldTypes = new TypeInformation[numFields];
    +           fieldNames = new String[numFields];
    +           for (String fieldName : outputSchema.getFieldNames()) {
    +                   HCatFieldSchema field = outputSchema.get(fieldName);
    +
    +                   int fieldPos = outputSchema.getPosition(fieldName);
    +                   TypeInformation fieldType = getFieldType(field);
    +
    +                   fieldTypes[fieldPos] = fieldType;
    +                   fieldNames[fieldPos] = fieldName;
    +
    +           }
    +           this.resultType = new TupleTypeInfo(fieldTypes);
    +
    +           return this;
    +   }
    +
    +   protected abstract int getMaxFlinkTupleSize();
    +
    +   private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
    +
    +           switch(fieldSchema.getType()) {
    +                   case INT:
    +                           return BasicTypeInfo.INT_TYPE_INFO;
    +                   case TINYINT:
    +                           return BasicTypeInfo.BYTE_TYPE_INFO;
    +                   case SMALLINT:
    +                           return BasicTypeInfo.SHORT_TYPE_INFO;
    +                   case BIGINT:
    +                           return BasicTypeInfo.LONG_TYPE_INFO;
    +                   case BOOLEAN:
    +                           return BasicTypeInfo.BOOLEAN_TYPE_INFO;
    +                   case FLOAT:
    +                           return BasicTypeInfo.FLOAT_TYPE_INFO;
    +                   case DOUBLE:
    +                           return BasicTypeInfo.DOUBLE_TYPE_INFO;
    +                   case STRING:
    +                           return BasicTypeInfo.STRING_TYPE_INFO;
    +                   case BINARY:
    +                           return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    +                   case ARRAY:
    +                           throw new UnsupportedOperationException("ARRAY 
type is not supported in Flink tuples, yet.");
    --- End diff --
    
    Why can't we use genericTypes here for arrays, hashmps, lists (see 
https://cwiki.apache.org/confluence/display/Hive/HCatalog+InputOutput)


> Add InputFormat to read HCatalog tables
> ---------------------------------------
>
>                 Key: FLINK-1466
>                 URL: https://issues.apache.org/jira/browse/FLINK-1466
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> HCatalog is a metadata repository and InputFormat to make Hive tables 
> accessible to other frameworks such as Pig.
> Adding support for HCatalog would give access to Hive managed data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to