This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/kinesis-v2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4a18cbe3675447043b939daa1dd7d6e2e73cc864
Author: Danny Mccormick <[email protected]>
AuthorDate: Wed Dec 18 15:14:40 2024 -0500

    [WIP] Update xlang kinesis to v2
---
 .github/trigger_files/beam_PostCommit_Python.json  |   2 +-
 .../expansion-service/build.gradle                 |  39 +++
 .../io/aws2/kinesis/KinesisTransformRegistrar.java | 267 +++++++++++++++++++++
 sdks/python/apache_beam/io/kinesis.py              |  46 +++-
 sdks/python/test-suites/portable/common.gradle     |   4 +-
 settings.gradle.kts                                |   1 +
 6 files changed, 344 insertions(+), 15 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 9c7a70ceed7..dd3d3e011a0 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 7
+  "modification": 8
 }
 
diff --git a/sdks/java/io/amazon-web-services2/expansion-service/build.gradle 
b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle
new file mode 100644
index 00000000000..fd712737f53
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(
+    automaticModuleName: 
'org.apache.beam.sdk.io.amazon-web-services2.expansion.service',
+    exportJavadoc: false,
+    validateShadowJar: false,
+    shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: 
Expansion Service"
+ext.summary = "Expansion service serving AWS2"
+
+dependencies {
+  implementation project(":sdks:java:expansion-service")
+  permitUnusedDeclared project(":sdks:java:expansion-service")
+  implementation project(":sdks:java:io:amazon-web-services2")
+  permitUnusedDeclared project(":sdks:java:io:amazon-web-services2")
+  runtimeOnly library.java.slf4j_jdk14
+}
\ No newline at end of file
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java
new file mode 100644
index 00000000000..85f5c9f2db9
--- /dev/null
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.kinesis;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external 
transform for
+ * cross-language usage.
+ */
+@AutoService(ExternalTransformRegistrar.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class KinesisTransformRegistrar implements ExternalTransformRegistrar {
+  public static final String WRITE_URN = 
"beam:transform:org.apache.beam:kinesis_write:v2";
+  public static final String READ_DATA_URN = 
"beam:transform:org.apache.beam:kinesis_read_data:v2";
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> 
knownBuilderInstances() {
+    return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new 
ReadDataBuilder());
+  }
+
+  private abstract static class CrossLanguageConfiguration {
+    String streamName;
+    String awsAccessKey;
+    String awsSecretKey;
+    Region region;
+    @Nullable String serviceEndpoint;
+
+    public void setStreamName(String streamName) {
+      this.streamName = streamName;
+    }
+
+    public void setAwsAccessKey(String awsAccessKey) {
+      this.awsAccessKey = awsAccessKey;
+    }
+
+    public void setAwsSecretKey(String awsSecretKey) {
+      this.awsSecretKey = awsSecretKey;
+    }
+
+    public void setRegion(String region) {
+      this.region = Region.of(region);
+    }
+
+    public void setServiceEndpoint(@Nullable String serviceEndpoint) {
+      this.serviceEndpoint = serviceEndpoint;
+    }
+  }
+
+  public static class WriteBuilder
+      implements ExternalTransformBuilder<WriteBuilder.Configuration, 
PCollection<byte[]>, PDone> {
+
+    public static class Configuration extends CrossLanguageConfiguration {
+      private Properties producerProperties;
+      private String partitionKey;
+
+      public void setProducerProperties(Map<String, String> 
producerProperties) {
+        if (producerProperties != null) {
+          Properties properties = new Properties();
+          producerProperties.forEach(properties::setProperty);
+          this.producerProperties = properties;
+        }
+      }
+
+      public void setPartitionKey(String partitionKey) {
+        this.partitionKey = partitionKey;
+      }
+    }
+
+    @Override
+    public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration 
configuration) {
+      AwsBasicCredentials creds =
+      AwsBasicCredentials.create(configuration.awsAccessKey, 
configuration.awsSecretKey);
+      StaticCredentialsProvider provider = 
StaticCredentialsProvider.create(creds);
+      KinesisIO.Write writeTransform =
+          KinesisIO.write()
+              .withStreamName(configuration.streamName)
+              .withClientConfiguration(
+                  ClientConfiguration.builder()
+                    .credentialsProvider(provider)
+                    .region(Region.of(configuration.region))
+                    .endpoint(configuration.serviceEndpoint)
+                    .build())
+              .withPartitioner(p -> configuration.partitionKey);
+
+      if (configuration.producerProperties != null) {
+        writeTransform = 
writeTransform.withProducerProperties(configuration.producerProperties);
+      }
+
+      return writeTransform;
+    }
+  }
+
+  public static class ReadDataBuilder
+      implements ExternalTransformBuilder<
+          ReadDataBuilder.Configuration, PBegin, PCollection<byte[]>> {
+
+    public static class Configuration extends CrossLanguageConfiguration {
+      private @Nullable Long maxNumRecords;
+      private @Nullable Duration maxReadTime;
+      private @Nullable InitialPositionInStream initialPositionInStream;
+      private @Nullable Instant initialTimestampInStream;
+      private @Nullable Integer requestRecordsLimit;
+      private @Nullable Duration upToDateThreshold;
+      private @Nullable Long maxCapacityPerShard;
+      private @Nullable WatermarkPolicy watermarkPolicy;
+      private @Nullable Duration watermarkIdleDurationThreshold;
+      private @Nullable Duration rateLimit;
+
+      public void setMaxNumRecords(@Nullable Long maxNumRecords) {
+        this.maxNumRecords = maxNumRecords;
+      }
+
+      public void setMaxReadTime(@Nullable Long maxReadTime) {
+        if (maxReadTime != null) {
+          this.maxReadTime = Duration.millis(maxReadTime);
+        }
+      }
+
+      public void setInitialPositionInStream(@Nullable String 
initialPositionInStream) {
+        if (initialPositionInStream != null) {
+          this.initialPositionInStream = 
InitialPositionInStream.valueOf(initialPositionInStream);
+        }
+      }
+
+      public void setInitialTimestampInStream(@Nullable Long 
initialTimestampInStream) {
+        if (initialTimestampInStream != null) {
+          this.initialTimestampInStream = 
Instant.ofEpochMilli(initialTimestampInStream);
+        }
+      }
+
+      public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) {
+        if (requestRecordsLimit != null) {
+          this.requestRecordsLimit = requestRecordsLimit.intValue();
+        }
+      }
+
+      public void setUpToDateThreshold(@Nullable Long upToDateThreshold) {
+        if (upToDateThreshold != null) {
+          this.upToDateThreshold = Duration.millis(upToDateThreshold);
+        }
+      }
+
+      public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) {
+        this.maxCapacityPerShard = maxCapacityPerShard;
+      }
+
+      public void setWatermarkPolicy(@Nullable String watermarkPolicy) {
+        if (watermarkPolicy != null) {
+          this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy);
+        }
+      }
+
+      public void setWatermarkIdleDurationThreshold(@Nullable Long 
watermarkIdleDurationThreshold) {
+        if (watermarkIdleDurationThreshold != null) {
+          this.watermarkIdleDurationThreshold = 
Duration.millis(watermarkIdleDurationThreshold);
+        }
+      }
+
+      public void setRateLimit(@Nullable Long rateLimit) {
+        if (rateLimit != null) {
+          this.rateLimit = Duration.millis(rateLimit);
+        }
+      }
+    }
+
+    private enum WatermarkPolicy {
+      ARRIVAL_TIME,
+      PROCESSING_TIME
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<byte[]>> buildExternal(
+        ReadDataBuilder.Configuration configuration) {
+      KinesisIO.Read<byte[]> readTransform =
+          KinesisIO.readData()
+              .withStreamName(configuration.streamName)
+              .withClientConfiguration(
+                  ClientConfiguration.builder()
+                    .credentialsProvider(provider)
+                    .region(Region.of(configuration.region))
+                    .endpoint(configuration.serviceEndpoint)
+                    .build());
+
+      if (configuration.maxNumRecords != null) {
+        readTransform = 
readTransform.withMaxNumRecords(configuration.maxNumRecords);
+      }
+      if (configuration.upToDateThreshold != null) {
+        readTransform = 
readTransform.withUpToDateThreshold(configuration.upToDateThreshold);
+      }
+      if (configuration.maxCapacityPerShard != null) {
+        readTransform =
+            
readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue());
+      }
+      if (configuration.watermarkPolicy != null) {
+        switch (configuration.watermarkPolicy) {
+          case ARRIVAL_TIME:
+            readTransform =
+                configuration.watermarkIdleDurationThreshold != null
+                    ? readTransform.withArrivalTimeWatermarkPolicy(
+                        configuration.watermarkIdleDurationThreshold)
+                    : readTransform.withArrivalTimeWatermarkPolicy();
+            break;
+          case PROCESSING_TIME:
+            readTransform = readTransform.withProcessingTimeWatermarkPolicy();
+            break;
+          default:
+            throw new RuntimeException(
+                String.format(
+                    "Unsupported watermark policy type: %s", 
configuration.watermarkPolicy));
+        }
+      }
+      if (configuration.rateLimit != null) {
+        readTransform = 
readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit);
+      }
+      if (configuration.maxReadTime != null) {
+        readTransform = 
readTransform.withMaxReadTime(configuration.maxReadTime);
+      }
+      if (configuration.initialPositionInStream != null) {
+        readTransform =
+            
readTransform.withInitialPositionInStream(configuration.initialPositionInStream);
+      }
+      if (configuration.requestRecordsLimit != null) {
+        readTransform = 
readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit);
+      }
+      if (configuration.initialTimestampInStream != null) {
+        readTransform =
+            
readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream);
+      }
+      return readTransform;
+    }
+  }
+}
\ No newline at end of file
diff --git a/sdks/python/apache_beam/io/kinesis.py 
b/sdks/python/apache_beam/io/kinesis.py
index bc5e1fa787b..0be98c122f6 100644
--- a/sdks/python/apache_beam/io/kinesis.py
+++ b/sdks/python/apache_beam/io/kinesis.py
@@ -49,7 +49,8 @@
   In this option, Python SDK will either download (for released Beam version) 
or
   build (when running from a Beam Git clone) a expansion service jar and use
   that to expand transforms. Currently Kinesis transforms use the
-  'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose.
+  'beam-sdks-java-io-amazon-web-services2-expansion-service' jar for this
+  purpose.
 
   *Option 2: specify a custom expansion service*
 
@@ -99,7 +100,7 @@ __all__ = [
 
 def default_io_expansion_service():
   return BeamJarExpansionService(
-      'sdks:java:io:kinesis:expansion-service:shadowJar')
+      'sdks:java:io:amazon-web-services2:expansion-service:shadowJar')
 
 
 WriteToKinesisSchema = NamedTuple(
@@ -111,7 +112,6 @@ WriteToKinesisSchema = NamedTuple(
         ('region', str),
         ('partition_key', str),
         ('service_endpoint', Optional[str]),
-        ('verify_certificate', Optional[bool]),
         ('producer_properties', Optional[Mapping[str, str]]),
     ],
 )
@@ -123,7 +123,7 @@ class WriteToKinesis(ExternalTransform):
 
     Experimental; no backwards compatibility guarantees.
   """
-  URN = 'beam:transform:org.apache.beam:kinesis_write:v1'
+  URN = 'beam:transform:org.apache.beam:kinesis_write:v2'
 
   def __init__(
       self,
@@ -145,14 +145,26 @@ class WriteToKinesis(ExternalTransform):
     :param aws_secret_key: Kinesis access key secret.
     :param region: AWS region. Example: 'us-east-1'.
     :param service_endpoint: Kinesis service endpoint
-    :param verify_certificate: Enable or disable certificate verification.
-        Never set to False on production. True by default.
+    :param verify_certificate: Deprecated - certificates will always be
+        verified.
     :param partition_key: Specify default partition key.
     :param producer_properties: Specify the configuration properties for 
Kinesis
         Producer Library (KPL) as dictionary.
         Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'}
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
+    if verify_certificate is False:
+      # Previously, we supported this via
+      # 
https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate--
+      # With the new AWS client, we no longer support it and it is always True
+      raise ValueError(
+        'verify_certificate set to False. This option is no longer ' +
+        'supported and certificate verification will still happen.')
+    if verify_certificate is True:
+      logging.warning(
+        'verify_certificate set to True. This option is no longer ' +
+        'supported and certificate verification will automatically happen. ' +
+        'This option may be removed in a future release')
     super().__init__(
         self.URN,
         NamedTupleBasedPayloadBuilder(
@@ -163,7 +175,6 @@ class WriteToKinesis(ExternalTransform):
                 region=region,
                 partition_key=partition_key,
                 service_endpoint=service_endpoint,
-                verify_certificate=verify_certificate,
                 producer_properties=producer_properties,
             )),
         expansion_service or default_io_expansion_service(),
@@ -178,7 +189,6 @@ ReadFromKinesisSchema = NamedTuple(
         ('aws_secret_key', str),
         ('region', str),
         ('service_endpoint', Optional[str]),
-        ('verify_certificate', Optional[bool]),
         ('max_num_records', Optional[int]),
         ('max_read_time', Optional[int]),
         ('initial_position_in_stream', Optional[str]),
@@ -199,7 +209,7 @@ class ReadDataFromKinesis(ExternalTransform):
 
     Experimental; no backwards compatibility guarantees.
   """
-  URN = 'beam:transform:org.apache.beam:kinesis_read_data:v1'
+  URN = 'beam:transform:org.apache.beam:kinesis_read_data:v2'
 
   def __init__(
       self,
@@ -229,8 +239,8 @@ class ReadDataFromKinesis(ExternalTransform):
     :param aws_secret_key: Kinesis access key secret.
     :param region: AWS region. Example: 'us-east-1'.
     :param service_endpoint: Kinesis service endpoint
-    :param verify_certificate: Enable or disable certificate verification.
-        Never set to False on production. True by default.
+    :param verify_certificate:  Deprecated - certificates will always be
+        verified.
     :param max_num_records: Specifies to read at most a given number of 
records.
         Must be greater than 0.
     :param max_read_time: Specifies to read records during x milliseconds.
@@ -277,6 +287,19 @@ class ReadDataFromKinesis(ExternalTransform):
     ):
       logging.warning('Provided timestamp emplaced not in the past.')
 
+    if verify_certificate is False:
+      # Previously, we supported this via
+      # 
https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate--
+      # With the new AWS client, we no longer support it and it is always True
+      raise ValueError(
+        'verify_certificate set to False. This option is no longer ' +
+        'supported and certificate verification will still happen.')
+    if verify_certificate is True:
+      logging.warning(
+        'verify_certificate set to True. This option is no longer ' +
+        'supported and certificate verification will automatically happen. ' +
+        'This option may be removed in a future release')
+
     super().__init__(
         self.URN,
         NamedTupleBasedPayloadBuilder(
@@ -286,7 +309,6 @@ class ReadDataFromKinesis(ExternalTransform):
                 aws_secret_key=aws_secret_key,
                 region=region,
                 service_endpoint=service_endpoint,
-                verify_certificate=verify_certificate,
                 max_num_records=max_num_records,
                 max_read_time=max_read_time,
                 initial_position_in_stream=initial_position_in_stream,
diff --git a/sdks/python/test-suites/portable/common.gradle 
b/sdks/python/test-suites/portable/common.gradle
index be87be74986..99b477b2c7d 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -376,7 +376,7 @@ 
project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
           ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar',
           ':sdks:java:io:expansion-service:shadowJar',
           ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
-          ':sdks:java:io:kinesis:expansion-service:shadowJar',
+          ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar',
           ':sdks:java:extensions:schemaio-expansion-service:shadowJar',
           ':sdks:java:io:debezium:expansion-service:shadowJar'
   ]
@@ -426,7 +426,7 @@ project.tasks.register("xlangSpannerIOIT") {
           ":sdks:java:container:${currentJavaVersion}:docker",
           ':sdks:java:io:expansion-service:shadowJar',
           ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
-          ':sdks:java:io:kinesis:expansion-service:shadowJar',
+          ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar',
           ':sdks:java:extensions:schemaio-expansion-service:shadowJar',
           ':sdks:java:io:debezium:expansion-service:shadowJar'
   ]
diff --git a/settings.gradle.kts b/settings.gradle.kts
index a8bee45a05a..624e9f970d9 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -206,6 +206,7 @@ include(":sdks:java:harness")
 include(":sdks:java:harness:jmh")
 include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
+include(":sdks:java:io:amazon-web-services2:expansion-service")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
 include(":sdks:java:io:azure-cosmos")

Reply via email to