It throws by 
if (!type.isTupleType()) {
                                throw new InvalidProgramException("Specifying 
keys via field positions is only valid " +
                                                "for tuple data types. Type: " 
+ type);
                        }


So, like Saikat saying, you may need a stream with tuples, because 
public interface ReduceFunction<T> extends Function, Serializable {

        /**
         * The core method of ReduceFunction, combining two values into one 
value of the same type.
         * The reduce function is consecutively applied to all values of a 
group until only a single value remains.
         *
         * @param value1 The first value to combine.
         * @param value2 The second value to combine.
         * @return The combined value of both input values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        T reduce(T value1, T value2) throws Exception;
}
 



Thanks

Chenguang He
Graduate Student
Department of Computer Science
Iowa State University

On April 18, 2016 at 1:30:19 PM, Saikat Maitra (saikat.mai...@gmail.com) wrote:

Hello Jitendra,  

I am new to Flink community but may have seen this issue earlier. Can you  
try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4  

Regards Saikat  

On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal <  
jitendra.agra...@northgateps.com> wrote:  

> Hi Team,  
>  
> Problem Description : When I was calling *reduce()* method on keyedStream  
> object then getting Ecxeption as  
>  
> "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via  
> field positions is only valid for tuple data types. Type: Integer*".  
>  
> StreamExecutionEnvironment env =  
> StreamExecutionEnvironment.getExecutionEnvironment();  
> KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0);  
> DataStream doubleIntegers4 = integers4.reduce(new  
> ReduceFunction<Integer>() {  
>  
> @Override  
> public Integer reduce(Integer arg0, Integer arg1) throws Exception {  
> return arg0 + arg1;  
> }  
> });  
>  
>  
> can you please tell me how to solve this exception? or can you provide any  
> link or example where I get these complete Solution..  
>  
> your help will be appreciated..  
>  
> Thanks & Regards  
>  
> *Jitendra Agrawal *  
>  
> Mumbai  
>  
> 9167539602  
>  
> --  
> This email is sent on behalf of Northgate Public Services (UK) Limited and  
> its associated companies including Rave Technologies (India) Pvt Limited  
> (together "Northgate Public Services") and is strictly confidential and  
> intended solely for the addressee(s).  
> If you are not the intended recipient of this email you must: (i) not  
> disclose, copy or distribute its contents to any other person nor use its  
> contents in any way or you may be acting unlawfully; (ii) contact  
> Northgate Public Services immediately on +44(0)1908 264500 quoting the name  
> of the sender and the addressee then delete it from your system.  
> Northgate Public Services has taken reasonable precautions to ensure that  
> no viruses are contained in this email, but does not accept any  
> responsibility once this email has been transmitted. You should scan  
> attachments (if any) for viruses.  
>  
> Northgate Public Services (UK) Limited, registered in England and Wales  
> under number 00968498 with a registered address of Peoplebuilding 2,  
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2  
> 4NN. Rave Technologies (India) Pvt Limited, registered in India under  
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi  
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.  
>  

Reply via email to