As I think this will possibly help others, I wanted to provide what I found
that ended up solving this problem.
The key thing is that the JVM needed to be forked, so needed this to be
added in:
ThisBuild / fork := true
final build.sbt file:
ThisBuild / scalaVersion := "3.7.1"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalacOptions += "-release:17"
ThisBuild / javacOptions ++= Seq("--release", "17")
ThisBuild / fork := true
val flinkVersion = "2.0.0"
val flinkConnectorVersion = "4.0.0-2.0"
lazy val flink_simple_testing = project
.in(file("flink_simple_testing"))
.settings(
name := "flink-testing",
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-core" % flinkVersion,
"org.apache.flink" % "flink-streaming-java" % flinkVersion,
"org.apache.flink" % "flink-clients" % flinkVersion,
"org.apache.flink" % "flink-runtime" % flinkVersion,
)
)
final script file:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.datastream.DataStreamSource
@main def main(): Unit = {
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setParallelism(1)
val data: DataStreamSource[String] = env.fromData(
"this", "is a", "test"
)
data.print()
val p = env.getExecutionPlan
println(p)
env.execute()
}
On Thu, Jul 3, 2025 at 3:27 PM Jeremie Doehla <[email protected]>
wrote:
> Hello --
> I was hoping to get some help getting things set up as something isn't
> working quite right and not sure what's missing.
> Thought I would try and just copy something cited as an example in case I
> missed something with my own code, so I pulled the example I see here:
> https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/overview/
>
> Updated slightly to this:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.time.Duration;
> import org.apache.flink.util.Collector;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>
>
> public class main {
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream<Tuple2<String, Integer>> dataStream = env
> .socketTextStream("localhost", 9999)
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
>
> .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
> .sum(1);
>
> dataStream.print();
>
> env.execute("Window WordCount");
> }
>
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word: sentence.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
>
> }
>
>
> As I'm ultimately intending to use scala, I'm leveraging sbt to try and
> run the java code. With that, I've got the following setup in my build.sbt
> file:
>
> ThisBuild / scalaVersion := "3.7.1"
> ThisBuild / version := "0.1.0-SNAPSHOT"
> ThisBuild / scalacOptions += "-release:17"
> ThisBuild / javacOptions ++= Seq("--release", "17")
>
>
> val flinkVersion = "2.0.0"
> val flinkConnectorVersion = "4.0.0-2.0"
>
> lazy val websiteExample = project
> .in(file("website_example"))
> .settings(
> name := "flink-testing",
> libraryDependencies ++= Seq(
> "org.apache.flink" % "flink-core" % flinkVersion,
> "org.apache.flink" % "flink-streaming-java" % flinkVersion,
> "org.apache.flink" % "flink-clients" % flinkVersion,
> "org.apache.flink" % "flink-runtime" % flinkVersion,
> "org.apache.flink" % "flink-datastream" % flinkVersion
> )
> )
>
>
> Everything compiles fine, but when running I get this error:
>
> [error] org.apache.flink.util.FlinkException: Failed to execute job
> 'Window WordCount'.
>
> Which traces back to this error:
>
> [error] Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory
>
> [error] at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>
> Which makes me think I'm missing some package to get the
> SimpleUdfStreamOperatorFactory to be in scope somewhere, but best I can
> tell that should have been captured with the flink-runtime package, right?
>
> So, my question is what might I be missing here to get this to work?
> Perhaps we can update the website documentation to help others out that
> might run into this issue too.
>
>
> In case this helps, I've also got this code:
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.datastream.DataStreamSource
>
>
> @main def main(): Unit = {
> val env = StreamExecutionEnvironment
> .getExecutionEnvironment
> .setParallelism(1)
>
> val data: DataStreamSource[String] = env.fromData(
> "this", "is a", "test"
> )
>
> data.print()
>
> val p = env.getExecutionPlan
> println(p)
>
> env.execute()
> }
>
> which if I execute returns the same error as the example mentioned above
> but might be a bit simpler.
>
> Thanks in advance,
> --Jeremie
>