Hello! I have a Scala 2.12 project which registers some tables (that get
their data from Kafka in JSON form) to the StreamTableEnvironment via the
executeSql command before calling execute on the StreamExecutionEnvironment.

Everything behaves as expected until I either try to set
/'format.ignore-parse-errors' = 'true'/ in the connector options, or I try
to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3)
METADATA FROM 'timestamp'/. In both of these case I get:

*Exception in thread "main" org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.*
*Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory supports all properties.
*

Additionally, for ignoring parsing errors:
*The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors*

While, for the timestamp field:
*The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
schema.#.metadata
schema.#.virtual*

Here is the DDL code used for table creation:
/
    "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " +
      "WITH (" +
      "'connector.type' = 'kafka', " +
      "'connector.version' = 'universal', " +
      "'connector.topic' = '" + name + "', " +
      "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " +
      "'connector.startup-mode' = '" +
      (if (checkLatest) "latest-offset" else "earliest-offset") +
      "', " +
      "'connector.properties.default.api.timeout.ms' = '5000', " +
      "'format.type' = 'json', " +
      "'format.fail-on-missing-field' = 'false'" +
      ")"
/

And here is the Flink-related config from build.sbt:
/
lazy val flinkVersion       = "1.12.0"
libraryDependencies ++= Seq(
  "org.apache.flink"          %% "flink-scala"                    %
flinkVersion,
  "org.apache.flink"          %% "flink-streaming-scala"          %
flinkVersion,
  "org.apache.flink"          %% "flink-connector-kafka"          %
flinkVersion,
  "org.apache.flink"          %% "flink-clients"                  %
flinkVersion,
  "org.apache.flink"          %% "flink-table-api-scala-bridge"   %
flinkVersion,
  "org.apache.flink"          %% "flink-table-planner-blink"      %
flinkVersion,
  "org.apache.flink"           % "flink-json"                     %
flinkVersion,
  "org.apache.flink"          %% "flink-test-utils"               %
flinkVersion      % Test,
  "org.apache.flink"          %% "flink-runtime"                  %
flinkVersion      % Test classifier "tests",
  "org.apache.flink"          %% "flink-streaming-java"           %
flinkVersion      % Test classifier "tests",
)
/

I would appreciate any tips on getting both the timestamp and the error
parse setting to work. Thank you in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to