Hi all

According to the official documentation (Table API / JDBC SQL Connector 
v.1.14.0) "the JDBC connector allows reading data from and writing data into 
any relational databases with a JDBC driver".
At the moment we are using SQL Server in conjunction with Flink and Java, which 
works perfectly fine. Now we try to fetch Data from a Kafka-Topic and write it 
to a SQL Server sink using PyFlink.
We succeeded in fetching the data from the kafka topic, but were not able to 
establish a connection to SQL Server.

Our code looks as follows:

import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, 
EnvironmentSettings, CsvTableSink, WriteMode
from pyflink.table.descriptors import Schema, Kafka, Json

def main():
        # Create streaming environment
        env = StreamExecutionEnvironment.get_execution_environment()

        settings = EnvironmentSettings.new_instance()\
                .in_streaming_mode()\
                .use_blink_planner()\
                .build()

        # create table environment
        tbl_env = 
StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=settings)

        # add kafka connector dependency
        kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 
'flink-sql-connector-kafka_2.11-1.13.0.jar')

        # add jdbc connector dependency
        jdbc_jar = 
os.path.join(os.path.abspath(os.path.dirname(__file__)),'flink-connector-jdbc_2.11-1.13.2.jar')
        mssql_jar = 
os.path.join(os.path.abspath(os.path.dirname(__file__)),'mssql-jdbc-8.2.2.jre8.jar')

        tbl_env.get_config()\
                .get_configuration().set_string("parallelism.default", "1")\
                .set_string("pipeline.jars", 
"file:///{};file:///{}".format(kafka_jar, jdbc_jar))\
                .set_string("pipeline.classpaths", 
"file:///{}".format(mssql_jar))

        kafka_table_sql = """
        CREATE TABLE kafka (
        [..] VARCHAR,
        data ROW(
                [..] ROW(
                        [..] VARCHAR,
                        [..] VARCHAR
                        ))
        ) WITH (
        'connector' = 'kafka',
        'property-version' = 'universal',
        'properties.bootstrap.servers' = '[..]',
        'topic' = '[..]',
        'scan.startup.mode' = 'earliest-offset',
        'properties.security.protocol' = 'SSL',
        'properties.ssl.endpoint.identification.algorithm' = '',
        'properties.ssl.truststore.location' = '[..]',
        'properties.ssl.truststore.password' = '[..]',
        'properties.ssl.keystore.type' = 'JKS',
        'properties.ssl.keystore.location' = '[..]',
        'properties.ssl.keystore.password' = [..],
        'properties.ssl.key.password' = [..],
        'properties.group.id' = '[..]',
        'format' = 'json'
        )
        """

       sqlserver_table_sql = """
        CREATE TABLE sqltest (
                [..] VARCHAR,
                [..] VARCHAR
        ) WITH (
                'connector' = 'jdbc',
                'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
                'url' = 'jdbc:sqlserver://db-server/database-name',
                'username' = '[..]',
                'password' = '[..],
                'table-name' = 'dbo.tablename'
        )
        """

        # create source table (kafka)
        tbl_env.execute_sql(kafka_table_sql)

        # create sink table (sql server)
        tbl_env.execute_sql(sqlserver_table_sql)

        # copy data from source to sink
        tbl_env.execute_sql("INSERT INTO sqltest SELECT [..], [..] FROM 
kafka").wait()

if __name__ == '__main__':
        main()


Which lead to an exception (java.lang.IllegalStateException: Cannot handle such 
jdbc url ..):

Traceback (most recent call last):
  File "c:/projects/flink/kafka_csv_jdbc.py", line 122, in <module>
    main()
  File "c:/projects/flink/kafka_csv_jdbc.py", line 119, in main
    tbl_env.execute_sql("[..]").wait()
  File 
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\table\table_environment.py",
 line 804, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\projects\flink\flink-evn\lib\site-packages\py4j\java_gateway.py", 
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\projects\flink\flink-evn\lib\site-packages\pyflink\util\exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "C:\projects\flink\flink-evn\lib\site-packages\py4j\protocol.py", line 
328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for 
writing table 'default_catalog.default_database.sqltest'.

Table options are:

'connector'='jdbc'
'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver'
'password'='[..]'
'table-name'='[..]'
'url'='jdbc:sqlserver:// [..]'
'username'='[..]'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: 
jdbc:sqlserver:// [..]
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:304)
        at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSink(JdbcDynamicTableFactory.java:172)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
        ... 29 more

How can we correctly establish a connection to SQL Server using PyFlink and the 
Table API?

Any suggestions are highly appreciated.

Thanks


Diese E-Mail ist ausschliesslich für den Adressaten bestimmt. Sollten Sie diese 
E-Mail irrtümlich erhalten haben oder wünschen Sie künftig keine Kontakte mehr 
per E-Mail, bitten wir Sie, die Bank Cler hierüber sofort zu orientieren. Die 
irrtümlich erhaltene E-Mail ist mit allen Anhängen unwiderruflich zu löschen, 
allfällige Ausdrucke sind zu vernichten und auf die Verwendung des Inhalts ist 
zu verzichten. Der Versand unverschlüsselter E-Mail birgt erhebliche Risiken in 
sich (mangelnde Vertraulichkeit, Manipulation von Inhalt/Absender, Fehlleitung, 
Viren etc.). Bank Cler lehnt jede Haftung für Schäden hieraus ab. Bank Cler 
akzeptiert grundsätzlich keine per E-Mail übermittelten Aufträge, Widerrufe von 
Aufträgen oder sonstige Weisungen etc., ohne verpflichtet zu sein, diese 
ausdrücklich zurück zu weisen. Kündigungen von Verträgen per E-Mail sind nicht 
rechtswirksam.

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to