chamikaramj commented on a change in pull request #15727: URL: https://github.com/apache/beam/pull/15727#discussion_r730195448
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformRegistrarImpl.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite.internal; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig.ReadExternalBuilder; +import org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig.WriteExternalBuilder; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +@AutoService(ExternalTransformRegistrar.class) +public class ExternalTransformRegistrarImpl implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:external:java:pubsublite:write:v1"; Review comment: Let's use: beam:transform:org.apache.beam:pubsublite_write:v1 beam:transform:org.apache.beam:pubsublite_read:v1 To comply with the approved proposal here: https://docs.google.com/document/d/1JOHPBNv6x6ziMdwr_96EPSP-Bx7C4IrLU1j267MSCts/edit?usp=sharing (I'll be updating existing URNs and updating Beam documentation) ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py ########## @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Google Pub/Sub Lite sources and sinks. + +This API is currently under development and is subject to change. +""" + +# pytype: skip-file + +import typing + +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +_ReadSchema = typing.NamedTuple( + '_ReadSchema', + [('subscription_path', str), + ('min_bundle_timeout', int), + ('deduplicate', bool)]) + + +def _default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + + +class ReadExternal(ExternalTransform): + """ + An external PTransform which reads from Pub/Sub Lite and returns a + SequencedMessage as serialized bytes. + + Experimental; no backwards compatibility guarantees. + """ + + def __init__( + self, + subscription_path, + min_bundle_timeout=None, + deduplicate=None + ): + """ + Initializes a read operation from Pub/Sub Lite. + + Args: + subscription_path: A Pub/Sub Lite Subscription path. + min_bundle_timeout: The minimum wall time to pass before allowing + bundle closure. Setting this to too small of a value will result in + increased compute costs and lower throughput per byte. Immediate + timeouts (0) may be useful for testing. + deduplicate: Whether to deduplicate messages based on the value of + the 'x-goog-pubsublite-dataflow-uuid' attribute. + """ + if min_bundle_timeout is None: + min_bundle_timeout = 60 * 1000 + if deduplicate is None: + deduplicate = False + super().__init__( + 'beam:external:java:pubsublite:read:v1', + NamedTupleBasedPayloadBuilder( + _ReadSchema( + subscription_path=subscription_path, + min_bundle_timeout=min_bundle_timeout, + deduplicate=deduplicate)), + _default_io_expansion_service()) Review comment: I think we should allow users the specify and expansion service as well similar to other external transforms. If one is not specified we'll automatically-start the default one. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py ########## @@ -0,0 +1,79 @@ +from apache_beam.transforms import Map, PTransform +from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal + +try: + from google.cloud import pubsublite +except ImportError: + pubsublite = None + + +class ReadFromPubSubLite(PTransform): + """A ``PTransform`` for reading from Pub/Sub Lite.""" + + def __init__( + self, + subscription_path, + min_bundle_timeout=None, + deduplicate=None + ): + """Initializes ``ReadFromPubSubLite``. + + Args: + subscription_path: Pub/Sub Lite Subscription in the form + "projects/<project>/locations/<location>/subscriptions/<subscription>". + min_bundle_timeout: The minimum wall time to pass before allowing + bundle closure. Setting this to too small of a value will result in + increased compute costs and lower throughput per byte. Immediate + timeouts (0) may be useful for testing. + deduplicate: Whether to deduplicate messages based on the value of + the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False. + """ + super().__init__() + self._source = ReadExternal( Review comment: Just initialize this in the expand method. ########## File path: sdks/python/setup.py ########## @@ -188,6 +188,7 @@ def get_version(): 'google-auth>=1.18.0,<3', 'google-cloud-datastore>=1.8.0,<2', 'google-cloud-pubsub>=0.39.0,<2', + 'google-cloud-pubsublite>=1.2.0,<2', Review comment: You also should to update https://github.com/apache/beam/blob/master/sdks/python/container/base_image_requirements.txt ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py ########## @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Google Pub/Sub Lite sources and sinks. + +This API is currently under development and is subject to change. +""" + +# pytype: skip-file + +import typing + +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +_ReadSchema = typing.NamedTuple( + '_ReadSchema', + [('subscription_path', str), + ('min_bundle_timeout', int), + ('deduplicate', bool)]) + + +def _default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + + +class ReadExternal(ExternalTransform): + """ + An external PTransform which reads from Pub/Sub Lite and returns a + SequencedMessage as serialized bytes. + + Experimental; no backwards compatibility guarantees. + """ + + def __init__( + self, + subscription_path, + min_bundle_timeout=None, + deduplicate=None + ): + """ + Initializes a read operation from Pub/Sub Lite. + + Args: + subscription_path: A Pub/Sub Lite Subscription path. + min_bundle_timeout: The minimum wall time to pass before allowing + bundle closure. Setting this to too small of a value will result in + increased compute costs and lower throughput per byte. Immediate + timeouts (0) may be useful for testing. + deduplicate: Whether to deduplicate messages based on the value of + the 'x-goog-pubsublite-dataflow-uuid' attribute. + """ + if min_bundle_timeout is None: + min_bundle_timeout = 60 * 1000 + if deduplicate is None: + deduplicate = False + super().__init__( + 'beam:external:java:pubsublite:read:v1', + NamedTupleBasedPayloadBuilder( + _ReadSchema( + subscription_path=subscription_path, + min_bundle_timeout=min_bundle_timeout, + deduplicate=deduplicate)), + _default_io_expansion_service()) + + +_WriteSchema = typing.NamedTuple( + '_WriteSchema', + [ + ('topic_path', str), + ('add_uuids', bool) + ]) + + +class WriteExternal(ExternalTransform): + """ + An external PTransform which writes serialized PubSubMessage protos to + Pub/Sub Lite. + + Experimental; no backwards compatibility guarantees. + """ + def __init__( + self, + topic_path, + add_uuids=None + ): + """ + Initializes a write operation to Pub/Sub Lite. + + Args: + topic_path: A Pub/Sub Lite Topic path. + add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid' + uuid attribute. + """ + if add_uuids is None: + add_uuids = False + super().__init__( + 'beam:external:java:pubsublite:write:v1', + NamedTupleBasedPayloadBuilder( + _WriteSchema( + topic_path=topic_path, + add_uuids=add_uuids, + )), + _default_io_expansion_service()) Review comment: Ditto regarding allowing users to specify an expansion service if needed. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py ########## @@ -0,0 +1,79 @@ +from apache_beam.transforms import Map, PTransform +from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal + +try: + from google.cloud import pubsublite +except ImportError: + pubsublite = None + + +class ReadFromPubSubLite(PTransform): Review comment: Seems like these transforms are the main API ? If so please move to above module and mark ReadExternal and WriteExternal private (start with '_'). Also please update the interface of the transforms here to allow users to specify an expansion service address. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py ########## @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with Review comment: Please rename to: sdks/python/apache_beam/io/gcp/pubsublite.py ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py ########## @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Google Pub/Sub Lite sources and sinks. + +This API is currently under development and is subject to change. +""" + +# pytype: skip-file + +import typing + +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +_ReadSchema = typing.NamedTuple( + '_ReadSchema', + [('subscription_path', str), + ('min_bundle_timeout', int), + ('deduplicate', bool)]) + + +def _default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') Review comment: Let's use "sdks:java:io:google-cloud-platform:expansion-service" instead. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py ########## @@ -0,0 +1,79 @@ +from apache_beam.transforms import Map, PTransform +from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal + +try: + from google.cloud import pubsublite +except ImportError: + pubsublite = None + + +class ReadFromPubSubLite(PTransform): + """A ``PTransform`` for reading from Pub/Sub Lite.""" + + def __init__( + self, + subscription_path, + min_bundle_timeout=None, + deduplicate=None + ): + """Initializes ``ReadFromPubSubLite``. + + Args: + subscription_path: Pub/Sub Lite Subscription in the form + "projects/<project>/locations/<location>/subscriptions/<subscription>". + min_bundle_timeout: The minimum wall time to pass before allowing + bundle closure. Setting this to too small of a value will result in + increased compute costs and lower throughput per byte. Immediate + timeouts (0) may be useful for testing. + deduplicate: Whether to deduplicate messages based on the value of + the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False. + """ + super().__init__() + self._source = ReadExternal( + subscription_path=subscription_path, + min_bundle_timeout=min_bundle_timeout, + deduplicate=deduplicate + ) + + def expand(self, pvalue): + pcoll = pvalue.pipeline | self._source + pcoll.element_type = bytes + pcoll = pcoll | Map(pubsublite.SequencedMessage.deserialize) + pcoll.element_type = pubsublite.SequencedMessage + return pcoll + + +class WriteToPubSubLite(PTransform): + """A ``PTransform`` for writing to Pub/Sub Lite.""" Review comment: Please document the input PCollection type. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py ########## @@ -0,0 +1,79 @@ +from apache_beam.transforms import Map, PTransform +from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal + +try: + from google.cloud import pubsublite +except ImportError: + pubsublite = None + + +class ReadFromPubSubLite(PTransform): + """A ``PTransform`` for reading from Pub/Sub Lite.""" + + def __init__( + self, + subscription_path, + min_bundle_timeout=None, + deduplicate=None + ): + """Initializes ``ReadFromPubSubLite``. + + Args: + subscription_path: Pub/Sub Lite Subscription in the form + "projects/<project>/locations/<location>/subscriptions/<subscription>". + min_bundle_timeout: The minimum wall time to pass before allowing + bundle closure. Setting this to too small of a value will result in + increased compute costs and lower throughput per byte. Immediate + timeouts (0) may be useful for testing. + deduplicate: Whether to deduplicate messages based on the value of + the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False. + """ + super().__init__() + self._source = ReadExternal( + subscription_path=subscription_path, + min_bundle_timeout=min_bundle_timeout, + deduplicate=deduplicate + ) + + def expand(self, pvalue): + pcoll = pvalue.pipeline | self._source + pcoll.element_type = bytes + pcoll = pcoll | Map(pubsublite.SequencedMessage.deserialize) + pcoll.element_type = pubsublite.SequencedMessage + return pcoll + + +class WriteToPubSubLite(PTransform): + """A ``PTransform`` for writing to Pub/Sub Lite.""" + + def __init__( + self, + topic_path, + add_uuids=None + ): + """Initializes ``WriteToPubSubLite``. + + Args: + topic_path: A Pub/Sub Lite Topic path. + add_uuids: Whether to add uuids to the 'x-goog-pubsublite-dataflow-uuid' + uuid attribute. Defaults to False. + """ + super().__init__() + self._source = WriteExternal( Review comment: Just initialize this in the expand method. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py ########## @@ -0,0 +1,79 @@ +from apache_beam.transforms import Map, PTransform +from apache_beam.io.gcp.pubsublite.external import ReadExternal, WriteExternal + +try: + from google.cloud import pubsublite +except ImportError: + pubsublite = None + + +class ReadFromPubSubLite(PTransform): + """A ``PTransform`` for reading from Pub/Sub Lite.""" Review comment: Please document the output PCollection type. ########## File path: sdks/python/apache_beam/io/gcp/pubsublite/external.py ########## @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Google Pub/Sub Lite sources and sinks. + +This API is currently under development and is subject to change. Review comment: Standard text is "Experimental; no backwards-compatibility guarantees.". Can we also add documentation regarding transforms defined here and their functionality ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
