piotr-szuberski commented on a change in pull request #12023:
URL: https://github.com/apache/beam/pull/12023#discussion_r448098803



##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.
+
+  If you start Flink's Job Server, the expansion service will be started on
+  port 8097. This is also the configured default for this transform. For a
+  different address, please set the expansion_service parameter.
+
+  For more information see:
+  - https://beam.apache.org/documentation/runners/flink/
+  - https://beam.apache.org/roadmap/portability/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = ['WriteToJdbc']
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+
+
+WriteToJdbcSchema = typing.NamedTuple(
+    'WriteToJdbcSchema',
+    [
+        ('driver_class_name', unicode),
+        ('jdbc_url', unicode),
+        ('username', unicode),
+        ('password', unicode),
+        ('statement', unicode),
+        ('connection_properties', unicode),
+        ('connection_init_sqls', typing.Optional[typing.List[unicode]]),
+    ],
+)
+
+
+class WriteToJdbc(ExternalTransform):
+  """
+  An external PTransform which writes Rows to the specified database.
+
+  This transform receives Rows defined as NamedTuple type and registered in
+  the coders registry to use RowCoder, e.g.
+
+    import typing
+    from apache_beam import coders
+
+    ExampleRow = typing.NamedTuple(
+      "ExampleRow",
+      [
+        ("id", int),
+        ("name", unicode),
+        ("budget", float),
+      ],
+    )
+    coders.registry.register_coder(ExampleRow, coders.RowCoder)
+
+  Experimental; no backwards compatibility guarantees.  It requires special
+  preparation of the Java SDK.  See BEAM-7870.
+  """
+
+  URN = 'beam:external:java:jdbc:write:v1'
+
+  def __init__(
+      self,
+      driver_class_name,
+      jdbc_url,
+      username,
+      password,
+      statement,
+      connection_properties='',
+      connection_init_sqls=None,
+      expansion_service=None,
+  ):

Review comment:
       Oh ok, I thought that you meant using **kwargs. It confused me but since 
I'm not familiar with python tricks I thought that there is some advantage of 
using it. But if you only meant passing named parameters then I'll switch back 
to the default values as then a user has IDE suggestions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to