[
https://issues.apache.org/jira/browse/FLINK-37443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933781#comment-17933781
]
Nil Madhab commented on FLINK-37443:
------------------------------------
There is definitely some issue with lamdba expression, I was following
FLIP-500: Support Join Extension in DataStream V2 API and tried to create a
project with 2.1-snapshot, it works fine with anonomous class, but does not
work with lambda.
Here is the
[file|https://raw.githubusercontent.com/nilmadhab/flink-tutorials/refs/heads/main/src/main/java/com/java/flinktutorials/datastreamv2/EnrichOrderExample.java?token=GHSAT0AAAAAAC44RVLL7QBUMIXT56TYJRC4Z6OU65A]
> Add returns() method to DataStream V2 API for specifying output types with
> lambda expressions
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-37443
> URL: https://issues.apache.org/jira/browse/FLINK-37443
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.20.1
> Environment: * Apache Flink version: 1.20.1
> * Java version: OpenJDK 21
> * API: DataStream V2 API
> Reporter: Nil Madhab
> Priority: Major
>
> While following the official
> [tutorial|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream-v2/overview/]
> of DataStream V2 API, I encountered an issue.
> When using the DataStream V2 API with lambda expressions for
> {{{}OneInputStreamProcessFunction{}}}, Java's type erasure prevents Flink
> from automatically determining the output type, resulting in the following
> exception:
>
> {code:java}
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The return type
> of function 'process(NonKeyedPartitionStreamImpl.java:74)' could not be
> determined automatically, due to type erasure.{code}
> {code:java}
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The
> generic type parameters of 'Collector' are missing. In many cases lambda
> methods don't provide enough information for automatic type extraction when
> Java generics are involved. An easy workaround is to use an (anonymous) class
> instead that implements the
> 'org.apache.flink.datastream.api.function.OneInputStreamProcessFunction'
> interface. Otherwise the type has to be specified explicitly using type
> information.
> at
> org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371){code}
>
>
> The error suggests implementing {{ResultTypeQueryable}} interface or using an
> anonymous class as workarounds. In the traditional DataStream API, users
> could simply call {{.returns(TypeInformation)}} to explicitly specify the
> output type.
> h3. Example that fails:
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
> (OneInputStreamProcessFunction<String, Integer>) (record, output, ctx) ->
> output.collect(Integer.parseInt(record))
> ); {code}
> h3.
> Example that works (but is more verbose):
> {code:java}
> NonKeyedPartitionStream<Integer> parsed = input.process(
> new OneInputStreamProcessFunction<String, Integer>() {
> @Override
> public void processRecord(String record, Collector<Integer> output,
> PartitionedContext ctx) throws Exception {
> output.collect(Integer.parseInt(record));
> }
> }
> ); {code}
> h3.
> Requested Enhancement
> Add a {{.returns(TypeInformation)}} or {{.returns(Class)}} method to the
> {{NonKeyedPartitionStream}} class in the DataStream V2 API to allow for the
> specification of output types when using lambda expressions with process
> functions.
> The documentation can use the anonymous class, until the issue is fixed, to
> prevent confusion for people new to flink.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)