[ 
https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546532#comment-16546532
 ] 

ASF GitHub Bot commented on FLINK-7251:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6120#discussion_r203003680
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
    @@ -237,19 +248,81 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation<?> type) {
        }
     
        /**
    -    * Gets the type of the key by which the stream is partitioned.
    -    * @return The type of the key by which the stream is partitioned.
    +    * Tries to fill in the type information. Type information can be 
filled in
    +    * later when the program uses a type hint. This method checks whether 
the
    +    * type information has ever been accessed before and does not allow
    +    * modifications if the type was accessed already. This ensures 
consistency
    +    * by making sure different parts of the operation do not assume 
different
    +    * type information.
    +    *
    +    * @param keyType The type information to fill in.
    +    *
    +    * @throws IllegalStateException Thrown, if the type information has 
been accessed before.
    +    */
    +   private void setKeyType(TypeInformation<KEY> keyType) {
    +           if (typeUsed) {
    +                   throw new IllegalStateException(
    +                                   "TypeInformation cannot be filled in 
for the type after it has been used. "
    +                                                   + "Please make sure 
that the type info hints are the first call after "
    +                                                   + "the keyBy() function 
before any other access.");
    +           }
    +           this.keyType = keyType;
    +   }
    +
    +   /**
    +    * Returns the key type of this {@code KeyedStream} as a {@link 
TypeInformation}. Once
    +    * this is used once the key type cannot be changed anymore using 
{@link #returns(TypeInformation)}.
    +    *
    +    * @return The output type of this {@code KeyedStream}
         */
        @Internal
        public TypeInformation<KEY> getKeyType() {
    -           return keyType;
    +           if (keyType instanceof MissingTypeInfo) {
    +                   MissingTypeInfo typeInfo = (MissingTypeInfo) 
this.keyType;
    +                   throw new InvalidTypesException(
    +                                   "The key type of key selector '"
    +                                                   + 
typeInfo.getFunctionName()
    +                                                   + "' could not be 
determined automatically, due to type erasure. "
    +                                                   + "You can give type 
information hints by using the returns(...) "
    +                                                   + "method on the result 
of the transformation call, or by letting "
    +                                                   + "your selector 
implement the 'ResultTypeQueryable' "
    +                                                   + "interface.", 
typeInfo.getTypeException());
    +           }
    +
    +           // perform the validation when the type is used for the first 
time
    +           if (!typeUsed) {
    +                   typeUsed = true;
    +                   validateKeyType(keyType);
    +           }
    +
    +           return this.keyType;
        }
     
        @Override
        protected DataStream<T> setConnectionType(StreamPartitioner<T> 
partitioner) {
                throw new UnsupportedOperationException("Cannot override 
partitioning for KeyedStream.");
        }
     
    +   // 
------------------------------------------------------------------------
    +   //  Type hinting
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Adds a type information hint about the key type of a key selector. 
This method
    +    * can be used in cases where Flink cannot determine automatically what 
the produced
    +    * type of a key selector is. That can be the case if the selector uses 
generic type variables
    +    * in the return type that cannot be inferred from the input type.
    +    *
    +    * @param typeInfo type information as a key type hint
    +    * @return This operator with a given key type hint.
    +    */
    +   public KeyedStream<T, KEY> returns(TypeInformation<KEY> typeInfo) {
    --- End diff --
    
    Should probably make this `@PublicEvolving` for now.


> Merge the flink-java8 project into flink-core
> ---------------------------------------------
>
>                 Key: FLINK-7251
>                 URL: https://issues.apache.org/jira/browse/FLINK-7251
>             Project: Flink
>          Issue Type: Improvement
>          Components: Build System
>            Reporter: Stephan Ewen
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to