Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors I expected the file system connectors as "built in"

I'm getting a similar issue while following the sql-training tutorial from https://github.com/ververica/sql-training/wiki. There I changed the used sql-client-conf.yaml file of the docker container for the sql client.

Thanks for any hints

Günter




[1]

################################################################################
# Define table sources here. See the Table API & SQL documentation for details.

tables:

  - name: Guenter
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: file:///home/swissbib/temp/trash/hello.txt


    format:
      type: csv
      # required: define the schema either by using type information
      schema: "ROW(test STRING)"

      # or use the table's schema
      derive-schema: true

      field-delimiter: ";"         # optional: field delimiter character (',' by default)       line-delimiter: "\r\n"       # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed)       quote-character: "'"         # optional: quote character for enclosing field values ('"' by default)       allow-comments: true         # optional: ignores comment lines that start with "#" (disabled by default)       #   if enabled, make sure to also ignore parse errors to allow empty rows       ignore-parse-errors: true    # optional: skip fields and rows with parse errors instead of failing;
      #   fields are set to null in case of errors
      array-element-delimiter: "|" # optional: the array element delimiter string for separating
      #   array and row element values (";" by default)
      escape-character: "\\"       # optional: escape character for escaping values (disabled by default)       null-literal: "n/a"          # optional: null literal string that is interpreted as a
      #   null value (disabled by default)



#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
  #planner: blink
  type: streaming              # 'batch' or 'streaming' execution
  result-mode: table           # 'changelog' or 'table' presentation of results
  parallelism: 1               # parallelism of the program
  max-parallelism: 128         # maximum parallelism
  min-idle-state-retention: 0  # minimum idle state retention in ms
  max-idle-state-retention: 0  # maximum idle state retention in ms

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
  type: standalone             # only the 'standalone' deployment is supported   response-timeout: 5000       # general cluster communication timeout in ms
  gateway-address: ""          # (optional) address from cluster to gateway
  gateway-port: 0              # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from: file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
No session environment specified.
Validating current environment...

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.     at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.     at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)     at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)     at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
    ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The following properties are requested:
connector.path=file:///home/swissbib/temp/trash/hello.txt
connector.type=filesystem
format.allow-comments=true
format.array-element-delimiter=|
format.derive-schema=true
format.escape-character=\\
format.field-delimiter=;
format.ignore-parse-errors=true
format.line-delimiter=\r\n
format.null-literal=n/a
format.quote-character='
format.schema=ROW(test STRING)
format.type=csv
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
    at org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)     at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)     at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)     at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)     at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)     at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)     at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
    ... 4 more



Reply via email to