[FLINK-1466] Add support for complex types in Flink tuples for HCatInputFormats.
This closes #411 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bed3da4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bed3da4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bed3da4a Branch: refs/heads/master Commit: bed3da4a61a8637c0faa9632b8a05ccef8c5a6dc Parents: a6acd2e Author: Fabian Hueske <fhue...@apache.org> Authored: Wed Feb 18 16:52:07 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Feb 20 16:10:35 2015 +0100 ---------------------------------------------------------------------- .../flink/hcatalog/HCatInputFormatBase.java | 29 +++++++++----------- .../flink/hcatalog/java/HCatInputFormat.java | 24 ++++++++++++++-- .../flink/hcatalog/scala/HCatInputFormat.scala | 27 +++++++++++++++--- 3 files changed, 58 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java index 59a6719..f23ac96 100644 --- a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java +++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.WritableTypeInfo; @@ -50,14 +51,15 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * A InputFormat to read from HCatalog tables. * The InputFormat supports projection (selection and order of fields) and partition filters. * - * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. - * Flink Tuples are only supported for primitive type fields - * (no STRUCT, ARRAY, or MAP data types) and have a size limitation. + * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or Flink-native tuple. + * + * Note: Flink tuples might only support a limited number of fields (depending on the API). * * @param <T> */ @@ -82,7 +84,7 @@ public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInp /** * Creates a HCatInputFormat for the given database and table. * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * The return type of the InputFormat can be changed to Flink-native tuples by calling * {@link HCatInputFormatBase#asFlinkTuples()}. * * @param database The name of the database to read from. @@ -97,7 +99,7 @@ public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInp * Creates a HCatInputFormat for the given database, table, and * {@link org.apache.hadoop.conf.Configuration}. * By default, the InputFormat returns {@link org.apache.hive.hcatalog.data.HCatRecord}. - * The return type of the InputFormat can be changed to Flink {@link org.apache.flink.api.java.tuple.Tuple} by calling + * The return type of the InputFormat can be changed to Flink-native tuples by calling * {@link HCatInputFormatBase#asFlinkTuples()}. * * @param database The name of the database to read from. @@ -159,15 +161,10 @@ public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInp } /** - * Specifies that the InputFormat returns Flink {@link org.apache.flink.api.java.tuple.Tuple} - * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}. - * At the moment, the following restrictions apply for returning Flink tuples: + * Specifies that the InputFormat returns Flink tuples instead of + * {@link org.apache.hive.hcatalog.data.HCatRecord}. * - * <ul> - * <li>Only primitive type fields can be returned in Flink Tuples - * (no STRUCT, MAP, ARRAY data types).</li> - * <li>Only a limited number of fields can be returned as Flink Tuple.</li> - * </ul> + * Note: Flink tuples might only support a limited number of fields (depending on the API). * * @return This InputFormat. * @throws org.apache.hive.hcatalog.common.HCatException @@ -222,11 +219,11 @@ public abstract class HCatInputFormatBase<T> implements InputFormat<T, HadoopInp case BINARY: return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; case ARRAY: - throw new UnsupportedOperationException("ARRAY type is not supported in Flink tuples, yet."); + return new GenericTypeInfo(List.class); case MAP: - throw new UnsupportedOperationException("MAP type is not supported in Flink tuples, yet."); + return new GenericTypeInfo(Map.class); case STRUCT: - throw new UnsupportedOperationException("STRUCT type not supported in Flink tuples, yet."); + return new GenericTypeInfo(List.class); default: throw new IllegalArgumentException("Unknown data type \""+fieldSchema.getType()+"\" encountered."); } http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java index c3c3a1c..46f3cd5 100644 --- a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java +++ b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java @@ -30,8 +30,7 @@ import org.apache.hive.hcatalog.data.HCatRecord; * The InputFormat supports projection (selection and order of fields) and partition filters. * * Data can be returned as {@link HCatRecord} or Flink {@link org.apache.flink.api.java.tuple.Tuple}. - * Flink Tuples are only supported for up to 25 fields of primitive types - * (no STRUCT, ARRAY, or MAP data types). + * Flink tuples support only up to 25 fields. * * @param <T> */ @@ -128,6 +127,27 @@ public class HCatInputFormat<T> extends HCatInputFormatBase<T> { tuple.setField(o, i); } break; + case ARRAY: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type ARRAY."); + } else { + tuple.setField(o, i); + } + break; + case MAP: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type MAP."); + } else { + tuple.setField(o, i); + } + break; + case STRUCT: + if(o instanceof String) { + throw new RuntimeException("Cannot handle partition keys of type STRUCT."); + } else { + tuple.setField(o, i); + } + break; default: throw new RuntimeException("Invalid Type"); } http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala index 7cc18f0..d5a3cbf 100644 --- a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala +++ b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala @@ -28,10 +28,8 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema * A InputFormat to read from HCatalog tables. * The InputFormat supports projection (selection and order of fields) and partition filters. * - * Data can be returned as {@link HCatRecord} or - * Flink {@link org.apache.flink.api.java.tuple.Tuple}. - * Flink Tuples are only supported for up to 22 fields of primitive types - * (no STRUCT, ARRAY, or MAP data types). + * Data can be returned as {@link HCatRecord} or Scala tuples. + * Scala tuples support only up to 22 fields. * */ class HCatInputFormat[T]( @@ -122,6 +120,27 @@ class HCatInputFormat[T]( else { vals(i) = o.asInstanceOf[Array[Byte]] } + case HCatFieldSchema.Type.ARRAY => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type ARRAY.") + } + else { + vals(i) = o.asInstanceOf[List[Object]] + } + case HCatFieldSchema.Type.MAP => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type MAP.") + } + else { + vals(i) = o.asInstanceOf[Map[Object, Object]] + } + case HCatFieldSchema.Type.STRUCT => + if (o.isInstanceOf[String]) { + throw new RuntimeException("Cannot handle partition keys of type STRUCT.") + } + else { + vals(i) = o.asInstanceOf[List[Object]] + } case _ => throw new RuntimeException("Invalid type " + this.outputSchema.get(i).getType + " encountered.")