Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3438#discussion_r104383800
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
---
@@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
}
/**
+ * Applies the given {@link ProcessFunction} on the input stream,
thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input
streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called
for each element
+ * in the stream.
+ *
+ * @param <R> The type of elements emitted by the {@code
ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R>
processFunction) {
+
+ TypeInformation<R> outType =
TypeExtractor.getUnaryOperatorReturnType(
+ processFunction,
+ ProcessFunction.class,
+ false,
+ true,
+ getType(),
+ Utils.getCallLocationName(),
+ true);
+
+ return process(processFunction, outType);
+ }
+
+ /**
+ * Applies the given {@link ProcessFunction} on the input stream,
thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input
streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called
for each element
+ * in the stream.
+ * @param outputType {@link TypeInformation} for the result type of the
function.
+ *
+ * @param <R> The type of elements emitted by the {@code
ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> process(
--- End diff --
Yes, it's exposed for that. The pattern, so far, is for methods to also
expose a public method that takes a `TypeInformation` because we get the
`TypeInformation` from the context bound in the Scala API.
Calling `transform()` manually is an option but if we do that we would
basically not base the Scala API on the Java API anymore and we would have code
that instantiates the Stream Operators in both the Java and Scala API. For
example, right now we have the code for instantiating a flat map operator in
`(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method.
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---