Two potential bugs in Flink ML

2024-04-04 Thread Komal M
Hi Flink Dev Team,
I have two possible bugs to report for Flink ML Iteration.
Flink v1.17.2
Flink ML v2.3.0
Java 11

Bug # 1
Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
IterationBody yields a “java.lang.ClassCastException: 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any 
error when applying  .keyBy().flatMap()on the streams individually inside the 
iteration body.

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
….
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
…
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 5 more
Caused by: java.lang.ClassCastException: class 
org.apache.flink.iteration.IterationRecord cannot be cast to class 
org.apache.flink.api.java.tuple.Tuple 
(org.apache.flink.iteration.IterationRecord and 
org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
at 
org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
at 
org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
at 
org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)



Potential Bug # 2

The onEpochWatermarkIncremented method is never invoked when the 
IterationListener interface is implemented by a UDF inside the iterationBody.



// method is invoked from within IterationBody

public class ComputeML2 extends KeyedProcessFunction, Tuple2> implements IterationListener>  {

// this method is never invoked, getting no output

@Override

public void onEpochWatermarkIncremented(int epochWaterMark, 
IterationListener.Context context, Collector> 
collector) throws Exception {

collector.collect(Tuple2.of(epochWaterMark,"epoch"));  //Bug: no output

}




@Override

public void onIterationTerminated(IterationListener.Context context, 
Collector> collector) throws Exception {

}


@Override

public void processElement(Tuple2 integerStringTuple2, 
KeyedProcessFunction, Tuple2>.Context context, Collector> collector) throws 
Exception {

// some processing here

}


}




Let me know if I should submit 

Re: Two potential bugs in Flink ML

2024-04-07 Thread Komal M
   collector.collect(Tuple2.of(modelUpdate.f0, 
weight));
context.output(finalOutputTag, 
state.get(weight));
}
}

@Override
public void open(Configuration config) {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(
"statedescriptor", // the state name
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
);
this.state = 
getRuntimeContext().getMapState(stateDescriptor);

}

});

DataStream finalOutput = 
newModelUpdate.getSideOutput(finalOutputTag);

return new IterationBodyResult(
DataStreamList.of(newModelUpdate),
DataStreamList.of(finalOutput));
});

result.get(0).print();

// Execute program
env.execute("Flink Java API Skeleton");
}
}

Best,
Komal

From: Yunfeng Zhou 
Date: Sunday, April 7, 2024 11:36
To: dev@flink.apache.org 
Subject: Re: Two potential bugs in Flink ML
Hi Komal,

For the first question, could you please provide a simple program that
could help reproduce this exception? That could help us better find
out the bugs (if any) in Flink ML.

For the second question, there have been Functions implementing the
IterationListener interface in Flink ML[1] and I just manually
verified that their onEpochWatermarkIncremented method can be invoked
in the test cases. You may check whether there is any difference
between your implementation and that in the Flink ML repo, and please
also feel free to provide a program that we can check together.

[1] 
https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/iteration/ForwardInputsOfLastRound.java#L45

Best,
Yunfeng

On Fri, Apr 5, 2024 at 2:01 PM Komal M  wrote:
>
> Hi Flink Dev Team,
> I have two possible bugs to report for Flink ML Iteration.
> Flink v1.17.2
> Flink ML v2.3.0
> Java 11
>
> Bug # 1
> Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
> IterationBody yields a “java.lang.ClassCastException: 
> org.apache.flink.iteration.IterationRecord cannot be cast to class 
> org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any 
> error when applying  .keyBy().flatMap()on the streams individually inside the 
> iteration body.
>
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> ….
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> …
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> Caused by: java.lang.ClassCastException: class 
> org.apache.flink.iteration.IterationRecord cannot be cast to class 
> org.apache.flink.api.java.tuple.Tuple 
> (org.apache.flink.iteration.IterationRecord and 
> org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
> at 
> org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
> at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
> at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
> at 
> org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetw

Re: Two potential bugs in Flink ML

2024-04-11 Thread Komal M
   context.output(finalOutputTag, 
state.get(weight));

}
}

@Override
public void open(Configuration config) {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(
"statedescriptor", // the state name
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
);
this.state = 
getRuntimeContext().getMapState(stateDescriptor);

}

});

DataStream finalOutput = 
newModelUpdate.getSideOutput(finalOutputTag);

return new IterationBodyResult(
DataStreamList.of(newModelUpdate),
DataStreamList.of(finalOutput));
});

//result.get(0).print();

// Execute program
env.execute("Flink Java API Skeleton");
}

public static final class testMap implements MapFunction, String>, IterationListener {


@Override
public String map(Tuple2 tuple) throws Exception {
System.out.println(tuple);
return null;
}

@Override
public void onEpochWatermarkIncremented(int i, Context context, 
Collector collector) throws Exception {
System.out.println("i");
}

@Override
public void onIterationTerminated(Context context, Collector 
collector) throws Exception {
System.out.println("Iteration Terminated");
}
}
}

Best,
Komal


From: Yunfeng Zhou 
Date: Tuesday, April 9, 2024 19:14
To: dev@flink.apache.org 
Subject: Re: Two potential bugs in Flink ML
Hi Komal,

Thanks for your example code! I found that Flink ML has a bug when it
comes to keyed two input operators. I have submitted a PR to fix this
bug and you can build the Flink ML library for your program according
to its document after this PR is approved.

The bugfix PR: https://github.com/apache/flink-ml/pull/260
The document to build Flink ML:
https://github.com/apache/flink-ml?tab=readme-ov-file#building-the-project

Best,
Yunfeng

On Mon, Apr 8, 2024 at 11:02 AM Komal M  wrote:
>
> Hi Yungfeng,
>
>
> Thank you so much for getting back!
>
> For the first bug, here is a sample code that should reproduce it. All it 
> does is subtract 1 from the feedback stream until the tuples reach 0.0. For 
> each subtraction it outputs a relevant message in the ‘finalOutput’ stream. 
> These messages are stored in the keyedState of KeyedCoProcessFunction and are 
> populated by a dataset stream called initialStates. For each key there are 
> different messages associated with it, hence the need for MapState.
>
>  For the second bug, let me compare my implementation to the references you 
> have provided and get back to you on that.
>
>
> import java.util.*;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.iteration.DataStreamList;
> import org.apache.flink.iteration.IterationBodyResult;
> import org.apache.flink.iteration.Iterations;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
>
>
> public class Test {
> public static void main(String[] args) throws Exception {
> // Sets up the execution environment, which is the main entry point
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>
> // sample datastreams (they are assumed to be unbounded streams outside of 
> this test environment)
> List> feedbackinitializer = Arrays.asList(
> new Tuple2<>("A", 2.0),
> new Tuple2<>("B", 3.0),
> new Tuple2<>("C", 1.0),
>