Please try copying the connector jar into the lib/ or opt/ directory.

On 3/26/2021 11:59 AM, Yik San Chan wrote:
This question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca <https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca>.

I am running a PyFlink program that reads from Hive `mysource` table, does some processing, then writes to Hive `mysink` table.

```
hive (aiinfra)> describe mysource;
OK
a                   bigint
b                   bigint

hive (aiinfra)> describe mysink;
OK
c                   bigint
```

This is my tree.

```
.
├── deps
│   ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
├── hive.py
```

This is the `hive.py`.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog("myhive", hive_catalog)

# set the HiveCatalog as the current catalog of the session
t_env.use_catalog("myhive")

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

The above program works fine, until I turn the catalog registration logic into SQL.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

CREATE_CATALOG_DDL = """
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'aiinfra',
    'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf'
)
"""

USE_CATALOG_DDL = """
USE CATALOG myhive
"""

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG_DDL)
t_env.execute_sql(USE_CATALOG_DDL)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

Running the latter version `python hive.py` throws exception:

```
Traceback (most recent call last):
  File "/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py", line 42, in <module>
    t_env.execute_sql(SOURCE_DDL_1)
  File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 766, in execute_sql   File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__   File "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco   File "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. : org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
default-database=aiinfra
hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

However, it is clear that HiveCatalogFactory is in the classpath.

```
$ jar tf deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar  | grep CatalogFactory
org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.class
```

I wonder how to fix it? Thanks.

Best,
Yik San


Reply via email to