[ https://issues.apache.org/jira/browse/FLINK-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler updated FLINK-1910: ------------------------------------ Description: {code:java} public static LinkedList values=new LinkedList<String>(); public static void main(String[] args) throws Exception { values.add("AUTOMOBILE"); values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak"); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Customer> customers = getCustomerDataSet(env); customers = customers.filter( new FilterFunction<Customer>() { @Override public boolean filter(Customer c) { return c.getField(4).equals(values.get(0).toString()) && c.getField(2).equals(values.get(1).toString()) ; } }); System.out.println(customers.print()); customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n", "|"); env.execute(); } public static class Customer extends Tuple5<Long,String,String,String,String> { } private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) { return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv") .fieldDelimiter('|') .includeFields("11100110").ignoreFirstLine() .tupleType(Customer.class); } {code} Environment: (was: public static LinkedList values=new LinkedList<String>(); public static void main(String[] args) throws Exception { values.add("AUTOMOBILE"); values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak"); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Customer> customers = getCustomerDataSet(env); customers = customers.filter( new FilterFunction<Customer>() { @Override public boolean filter(Customer c) { return c.getField(4).equals(values.get(0).toString()) && c.getField(2).equals(values.get(1).toString()) ; } }); System.out.println(customers.print()); customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n", "|"); env.execute(); } public static class Customer extends Tuple5<Long,String,String,String,String> { } private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) { return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv") .fieldDelimiter('|') .includeFields("11100110").ignoreFirstLine() .tupleType(Customer.class); }) > why this code flink not reurn value when use variable in filter > --------------------------------------------------------------- > > Key: FLINK-1910 > URL: https://issues.apache.org/jira/browse/FLINK-1910 > Project: Flink > Issue Type: Bug > Reporter: hagersaleh > > {code:java} > public static LinkedList values=new LinkedList<String>(); > public static void main(String[] args) throws Exception > { > values.add("AUTOMOBILE"); > values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak"); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Customer> customers = getCustomerDataSet(env); > customers = customers.filter( > new FilterFunction<Customer>() { > @Override > public boolean filter(Customer c) { > return c.getField(4).equals(values.get(0).toString()) && > c.getField(2).equals(values.get(1).toString()) ; > } > }); > System.out.println(customers.print()); > customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n", > "|"); > env.execute(); > } > public static class Customer extends Tuple5<Long,String,String,String,String> > { > } > private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) > { > return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv") > .fieldDelimiter('|') > .includeFields("11100110").ignoreFirstLine() > .tupleType(Customer.class); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)