[ 
https://issues.apache.org/jira/browse/FLINK-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15046828#comment-15046828
 ] 

Aljoscha Krettek commented on FLINK-3138:
-----------------------------------------

How does this manifest?

I changed the WordCount (streaming) example in {{flink-java8}} to this and it 
still works:
{code}
public class WordCount {
        
        // 
*************************************************************************
        //     PROGRAM
        // 
*************************************************************************

        public static String keyIt(Tuple2<String, Integer> e) {
                return e.f0;
        }

        public static void main(String[] args) throws Exception {
                
                if(!parseParameters(args)) {
                        return;
                }
                
                // set up the execution environment
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
                // get input data
                DataStream<String> text = getTextDataStream(env);
                
                DataStream<Tuple2<String, Integer>> counts = 
                                // normalize and split each line
                                text.map(line -> 
line.toLowerCase().split("\\W+")).returns(String[].class)
                                // convert splitted line in pairs (2-tuples) 
containing: (word,1)
                                .flatMap((String[] tokens, 
Collector<Tuple2<String, Integer>> out) -> {
                                        // emit the pairs with non-zero-length 
words
                                        Arrays.stream(tokens)
                                        .filter(t -> t.length() > 0)
                                        .forEach(t -> out.collect(new 
Tuple2<>(t, 1)));
                                }).returns("Tuple2<String, Integer>")
                                // group by the tuple field "0" and sum up 
tuple field "1"
                                .keyBy(WordCount::keyIt)
                                .sum(1);

                // emit result
                if(fileOutput) {
                        counts.writeAsCsv(outputPath, 1);
                } else {
                        counts.print();
                }
                
                // execute program
                env.execute("Streaming WordCount Example");
        }
        
        // 
*************************************************************************
        //     UTIL METHODS
        // 
*************************************************************************
        
        private static boolean fileOutput = false;
        private static String textPath;
        private static String outputPath;
        
        private static boolean parseParameters(String[] args) {
                
                if(args.length > 0) {
                        // parse input arguments
                        fileOutput = true;
                        if(args.length == 2) {
                                textPath = args[0];
                                outputPath = args[1];
                        } else {
                                System.err.println("Usage: WordCount <text 
path> <result path>");
                                return false;
                        }
                } else {
                        System.out.println("Executing WordCount example with 
built-in default data.");
                        System.out.println("  Provide parameters to read input 
data from a file.");
                        System.out.println("  Usage: WordCount <text path> 
<result path>");
                }
                return true;
        }
        
        private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
                if (fileOutput) {
                        // read the text file from given input path
                        return env.readTextFile(textPath);
                } else {
                        // get default test text data
                        return env.fromElements(WordCountData.WORDS);
                }
        }
}
{code}

> Method References are not supported as lambda expressions
> ---------------------------------------------------------
>
>                 Key: FLINK-3138
>                 URL: https://issues.apache.org/jira/browse/FLINK-3138
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 0.10.2
>            Reporter: Stephan Ewen
>             Fix For: 1.0.0
>
>
> For many functions (here for example KeySelectors), one can use lambda 
> expressions:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( v -> v.getId() )
> {code}
> Java's other syntax for this, Method References, do not work:
> {code}
> DataStream<MyType> stream = ...;
> stream.keyBy( MyType::getId )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to