Hi,
I am trying to connect to kafka through flink, but having some difficulty
getting the right table-factory-source.
I currently get the error: NoMatchingTableFactoryException: Could not find
a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in the classpath. my
sbt file looks like this:
name := "writeToSQL"
version := "0.1"
scalaVersion := "2.11.12"
val flinkVersion = "1.9.1"
val hadoopVersion = "3.0.0"
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.15" % "runtime",
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion % "compile",
"org.apache.flink" %% "flink-sql-connector-kafka" % flinkVersion % "compile",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion %
"provided",
"org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-table" % flinkVersion % "compile",
"org.apache.flink" % "flink-json" % flinkVersion % "compile",
"org.slf4j" % "slf4j-log4j12" % "1.7.25" % "runtime"
)
assemblyMergeStrategy in assembly := {
case path if path.contains("META-INF/services") => MergeStrategy.concat
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
>From the documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory
I
can see what is missing, but I do not know how to solve it.
The documentation says the following:
Define a TableFactory
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory>
A TableFactory allows to create different table-related instances from
string-based properties. All available factories are called for matching to
the given set of properties and a corresponding factory class.
Factories leverage Java’s Service Provider Interfaces (SPI)
<https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html> for
discovering. This means that every dependency and JAR file should contain a
file org.apache.flink.table.factories.TableFactory in the
META_INF/services resource
directory that lists all available table factories that it provides.
But how do I do that? I thought the sbt-file would take care of this.
Any help is highly appreciated!
Best regards
Martin Frank Hansen