[FLINK-1466] Add support for complex types in Flink tuples for HCatInputFormats.

This closes #411


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bed3da4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bed3da4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bed3da4a

Branch: refs/heads/master
Commit: bed3da4a61a8637c0faa9632b8a05ccef8c5a6dc
Parents: a6acd2e
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed Feb 18 16:52:07 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Feb 20 16:10:35 2015 +0100

----------------------------------------------------------------------
 .../flink/hcatalog/HCatInputFormatBase.java     | 29 +++++++++-----------
 .../flink/hcatalog/java/HCatInputFormat.java    | 24 ++++++++++++++--
 .../flink/hcatalog/scala/HCatInputFormat.scala  | 27 +++++++++++++++---
 3 files changed, 58 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
index 59a6719..f23ac96 100644
--- 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
 import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
@@ -50,14 +51,15 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A InputFormat to read from HCatalog tables.
  * The InputFormat supports projection (selection and order of fields) and 
partition filters.
  *
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink {@link org.apache.flink.api.java.tuple.Tuple}.
- * Flink Tuples are only supported for primitive type fields
- * (no STRUCT, ARRAY, or MAP data types) and have a size limitation.
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink-native tuple.
+ *
+ * Note: Flink tuples might only support a limited number of fields (depending 
on the API).
  *
  * @param <T>
  */
@@ -82,7 +84,7 @@ public abstract class HCatInputFormatBase<T> implements 
InputFormat<T, HadoopInp
        /**
         * Creates a HCatInputFormat for the given database and table.
         * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
+        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
         * {@link HCatInputFormatBase#asFlinkTuples()}.
         *
         * @param database The name of the database to read from.
@@ -97,7 +99,7 @@ public abstract class HCatInputFormatBase<T> implements 
InputFormat<T, HadoopInp
         * Creates a HCatInputFormat for the given database, table, and
         * {@link org.apache.hadoop.conf.Configuration}.
         * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink {@link 
org.apache.flink.api.java.tuple.Tuple} by calling
+        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
         * {@link HCatInputFormatBase#asFlinkTuples()}.
         *
         * @param database The name of the database to read from.
@@ -159,15 +161,10 @@ public abstract class HCatInputFormatBase<T> implements 
InputFormat<T, HadoopInp
        }
 
        /**
-        * Specifies that the InputFormat returns Flink {@link 
org.apache.flink.api.java.tuple.Tuple}
-        * instead of {@link org.apache.hive.hcatalog.data.HCatRecord}.
-        * At the moment, the following restrictions apply for returning Flink 
tuples:
+        * Specifies that the InputFormat returns Flink tuples instead of
+        * {@link org.apache.hive.hcatalog.data.HCatRecord}.
         *
-        * <ul>
-        *     <li>Only primitive type fields can be returned in Flink Tuples
-        *          (no STRUCT, MAP, ARRAY data types).</li>
-        *     <li>Only a limited number of fields can be returned as Flink 
Tuple.</li>
-        * </ul>
+        * Note: Flink tuples might only support a limited number of fields 
(depending on the API).
         *
         * @return This InputFormat.
         * @throws org.apache.hive.hcatalog.common.HCatException
@@ -222,11 +219,11 @@ public abstract class HCatInputFormatBase<T> implements 
InputFormat<T, HadoopInp
                        case BINARY:
                                return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
                        case ARRAY:
-                               throw new UnsupportedOperationException("ARRAY 
type is not supported in Flink tuples, yet.");
+                               return new GenericTypeInfo(List.class);
                        case MAP:
-                               throw new UnsupportedOperationException("MAP 
type is not supported in Flink tuples, yet.");
+                               return new GenericTypeInfo(Map.class);
                        case STRUCT:
-                               throw new UnsupportedOperationException("STRUCT 
type not supported in Flink tuples, yet.");
+                               return new GenericTypeInfo(List.class);
                        default:
                                throw new IllegalArgumentException("Unknown 
data type \""+fieldSchema.getType()+"\" encountered.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
index c3c3a1c..46f3cd5 100644
--- 
a/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ 
b/flink-staging/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -30,8 +30,7 @@ import org.apache.hive.hcatalog.data.HCatRecord;
  * The InputFormat supports projection (selection and order of fields) and 
partition filters.
  *
  * Data can be returned as {@link HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
- * Flink Tuples are only supported for up to 25 fields of primitive types
- * (no STRUCT, ARRAY, or MAP data types).
+ * Flink tuples support only up to 25 fields.
  *
  * @param <T>
  */
@@ -128,6 +127,27 @@ public class HCatInputFormat<T> extends 
HCatInputFormatBase<T> {
                                                tuple.setField(o, i);
                                        }
                                        break;
+                               case ARRAY:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type ARRAY.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case MAP:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type MAP.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case STRUCT:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type STRUCT.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
                                default:
                                        throw new RuntimeException("Invalid 
Type");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bed3da4a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
 
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
index 7cc18f0..d5a3cbf 100644
--- 
a/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ 
b/flink-staging/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -28,10 +28,8 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
  * A InputFormat to read from HCatalog tables.
  * The InputFormat supports projection (selection and order of fields) and 
partition filters.
  *
- * Data can be returned as {@link HCatRecord} or
- * Flink {@link org.apache.flink.api.java.tuple.Tuple}.
- * Flink Tuples are only supported for up to 22 fields of primitive types
- * (no STRUCT, ARRAY, or MAP data types).
+ * Data can be returned as {@link HCatRecord} or Scala tuples.
+ * Scala tuples support only up to 22 fields.
  *
  */
 class HCatInputFormat[T](
@@ -122,6 +120,27 @@ class HCatInputFormat[T](
             else {
               vals(i) = o.asInstanceOf[Array[Byte]]
             }
+          case HCatFieldSchema.Type.ARRAY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
ARRAY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
+          case HCatFieldSchema.Type.MAP =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
MAP.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Map[Object, Object]]
+            }
+          case HCatFieldSchema.Type.STRUCT =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
STRUCT.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
           case _ =>
             throw new RuntimeException("Invalid type " + 
this.outputSchema.get(i).getType +
               " encountered.")

Reply via email to