Nil Madhab created FLINK-37443:
----------------------------------
Summary: Add returns() method to DataStream V2 API for specifying
output types with lambda expressions
Key: FLINK-37443
URL: https://issues.apache.org/jira/browse/FLINK-37443
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Affects Versions: 1.20.1
Environment: * Apache Flink version: 1.20.1
* Java version: OpenJDK 21
* API: DataStream V2 API
Reporter: Nil Madhab
While following the official
[tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
of DataStream V2 API, I encountered an issue.
When using the DataStream V2 API with lambda expressions for
{{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink from
automatically determining the output type, resulting in the following exception:
{code:java}
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: The return type of
function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be determined
automatically, due to type erasure.{code}
The error suggests implementing {{ResultTypeQueryable}} interface or using an
anonymous class as workarounds. In the traditional DataStream API, users could
simply call {{.returns(TypeInformation)}} to explicitly specify the output type.
h3. Example that fails:
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
(OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) ->
output.collect(Integer.parseInt(record))
); {code}
h3.
Example that works (but is more verbose):
{code:java}
NonKeyedPartitionStream<Integer> parsed = input.process(
new OneInputStreamProcessFunction<String, Integer>() {
@Override
public void processRecord(String record, Collector<Integer> output,
PartitionedContext ctx) throws Exception {
output.collect(Integer.parseInt(record));
}
}
); {code}
h3.
Requested Enhancement
Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the
{{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the
specification of output types when using lambda expressions with process
functions.
The documentation can use the anonymous class, until the issue is fixed, to
prevent confusion for people new to flink.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)