[
https://issues.apache.org/jira/browse/FLINK-2678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15133932#comment-15133932
]
ASF GitHub Bot commented on FLINK-2678:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1566#discussion_r51997240
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
---
@@ -72,15 +77,59 @@ public int getTotalFields() {
@Override
public boolean isKeyType() {
- return false;
+ return true;
}
@SuppressWarnings("unchecked")
@Override
public TypeSerializer<T> createSerializer(ExecutionConfig
executionConfig) {
return (TypeSerializer<T>) new GenericArraySerializer<C>(
- componentInfo.getTypeClass(),
- componentInfo.createSerializer(executionConfig));
+ componentInfo.getTypeClass(),
+
componentInfo.createSerializer(executionConfig));
+ }
+
+ @SuppressWarnings("unchecked")
+ private TypeComparator<? super Object>
getBaseComparatorInfo(TypeInformation<? extends Object> componentInfo, boolean
sortOrderAscending, ExecutionConfig executionConfig) {
+ /**
+ * method tries to find out the Comparator to be used to
compare each element (of primitive type or composite type) of the provided
Object arrays.
+ */
+ if (componentInfo instanceof ObjectArrayTypeInfo) {
+ return getBaseComparatorInfo(((ObjectArrayTypeInfo)
componentInfo).getComponentInfo(), sortOrderAscending, executionConfig);
+ }
+ else if (componentInfo instanceof PrimitiveArrayTypeInfo) {
+ return getBaseComparatorInfo(((PrimitiveArrayTypeInfo<?
extends Object>) componentInfo).getComponentType(), sortOrderAscending,
executionConfig);
+ }
+ else {
+ if (componentInfo instanceof AtomicType) {
+ return ((AtomicType<? super Object>)
componentInfo).createComparator(sortOrderAscending, executionConfig);
+ }
+ else if (componentInfo instanceof CompositeType) {
+ int componentArity = ((CompositeType<? extends
Object>) componentInfo).getArity();
+ int [] logicalKeyFields = new
int[componentArity];
+ boolean[] orders = new boolean[componentArity];
+
+ for (int i=0;i < componentArity;i++) {
+ logicalKeyFields[i] = i;
+ orders[i] = sortOrderAscending;
+ }
+
+ return ((CompositeType<? super Object>)
componentInfo).createComparator(logicalKeyFields, orders, 0, executionConfig);
+ }
+ else {
+ throw new IllegalArgumentException("Could not
add a comparator for the component type " + componentInfo.getClass().getName());
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeComparator<T> createComparator(boolean sortOrderAscending,
ExecutionConfig executionConfig) {
+
+ return (TypeComparator<T>) new ObjectArrayComparator<T,C>(
+ sortOrderAscending,
+ (GenericArraySerializer<T>)
createSerializer(executionConfig),
--- End diff --
Why this cast here?
> DataSet API does not support multi-dimensional arrays as keys
> -------------------------------------------------------------
>
> Key: FLINK-2678
> URL: https://issues.apache.org/jira/browse/FLINK-2678
> Project: Flink
> Issue Type: Wish
> Components: DataSet API
> Reporter: Till Rohrmann
> Assignee: Subhobrata Dey
> Priority: Minor
>
> The DataSet API does not support grouping/sorting on field which are
> multi-dimensional arrays. It could be helpful to also support these types.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)