This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48cac9b09a3 Remove deprecated AWS coders where replaced by schemas
(closes #23315) (#26715)
48cac9b09a3 is described below
commit 48cac9b09a30bc32b9935b728af6ad3a79bb4435
Author: Moritz Mack <[email protected]>
AuthorDate: Tue May 16 20:01:17 2023 +0200
Remove deprecated AWS coders where replaced by schemas (closes #23315)
(#26715)
---
CHANGES.md | 1 +
.../apache/beam/sdk/io/aws2/coders/AwsCoders.java | 142 ---------------------
.../beam/sdk/io/aws2/coders/package-info.java | 19 ---
.../sdk/io/aws2/sns/PublishResponseCoders.java | 134 -------------------
.../org/apache/beam/sdk/io/aws2/sns/SnsIO.java | 53 +-------
.../beam/sdk/io/aws2/coders/AwsCodersTest.java | 69 ----------
.../sdk/io/aws2/sns/PublishResponseCodersTest.java | 92 -------------
.../org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java | 39 ------
8 files changed, 2 insertions(+), 547 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 4c0cb3ce3c2..5774be7d646 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -79,6 +79,7 @@
* CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow
runner as the Google Cloud Debugger service is [shutting
down](https://cloud.google.com/debugger/docs/deprecations). (Java)
([#25959](https://github.com/apache/beam/issues/25959)).
* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20))
are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).
* AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss)
was finally removed ([#26710](https://github.com/apache/beam/issues/26710)).
+* AWS 2 coders (deprecated in Beam v2.43.0 when adding Schema support for AWS
Sdk Pojos) are finally removed
([#23315](https://github.com/apache/beam/issues/23315)).
## Deprecations
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
deleted file mode 100644
index a15818875c2..00000000000
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.coders;
-
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/**
- * {@link Coder}s for common AWS SDK objects.
- *
- * @deprecated {@link org.apache.beam.sdk.schemas.SchemaCoder SchemaCoders}
for {@link
- * software.amazon.awssdk.core.SdkPojo AWS model classes} will be
automatically inferred by
- * means of {@link org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider
AwsSchemaProvider}.
- */
-@Deprecated
-public final class AwsCoders {
-
- private AwsCoders() {}
-
- /**
- * Returns a new coder for {@link AwsResponseMetadata} (AWS request ID only).
- *
- * @return the {@link AwsResponseMetadata} coder
- */
- public static Coder<AwsResponseMetadata> awsResponseMetadata() {
- return new AwsResponseMetadataCoder();
- }
-
- /**
- * Returns a new coder for {@link SdkHttpResponse} (HTTP status code and
headers).
- *
- * @return the SdkHttpResponse coder
- */
- public static Coder<SdkHttpResponse> sdkHttpResponse() {
- return new SdkHttpResponseCoder(true);
- }
-
- /**
- * Returns a new coder for {@link SdkHttpResponse} (HTTP status code only).
- *
- * @return the SdkHttpResponse coder
- */
- public static Coder<SdkHttpResponse> sdkHttpResponseWithoutHeaders() {
- return new SdkHttpResponseCoder(false);
- }
-
- private static class AwsResponseMetadataCoder extends
AtomicCoder<AwsResponseMetadata> {
- private static final Coder<String> REQUEST_ID_CODER = StringUtf8Coder.of();
-
- private static class DecodedAwsResponseMetadata extends
AwsResponseMetadata {
- protected DecodedAwsResponseMetadata(String requestId) {
- super(ImmutableMap.of(AWS_REQUEST_ID, requestId));
- }
- }
-
- @Override
- public void encode(AwsResponseMetadata value, OutputStream outStream)
- throws CoderException, IOException {
- REQUEST_ID_CODER.encode(value.requestId(), outStream);
- }
-
- @Override
- public AwsResponseMetadata decode(InputStream inStream) throws
CoderException, IOException {
- return new DecodedAwsResponseMetadata(REQUEST_ID_CODER.decode(inStream));
- }
- }
-
- private static class SdkHttpResponseCoder extends
CustomCoder<SdkHttpResponse> {
- private static final Coder<Integer> STATUS_CODE_CODER = VarIntCoder.of();
- private static final Coder<Map<String, List<String>>> HEADERS_ENCODER =
- NullableCoder.of(MapCoder.of(StringUtf8Coder.of(),
ListCoder.of(StringUtf8Coder.of())));
-
- private final boolean includeHeaders;
-
- protected SdkHttpResponseCoder(boolean includeHeaders) {
- this.includeHeaders = includeHeaders;
- }
-
- @Override
- public void encode(SdkHttpResponse value, OutputStream outStream)
- throws CoderException, IOException {
- STATUS_CODE_CODER.encode(value.statusCode(), outStream);
- if (includeHeaders) {
- HEADERS_ENCODER.encode(value.headers(), outStream);
- }
- }
-
- @Override
- public SdkHttpResponse decode(InputStream inStream) throws CoderException,
IOException {
- SdkHttpResponse.Builder httpResponseBuilder =
-
SdkHttpResponse.builder().statusCode(STATUS_CODE_CODER.decode(inStream));
-
- if (includeHeaders) {
- Map<String, List<String>> headers = HEADERS_ENCODER.decode(inStream);
- if (headers != null) {
- httpResponseBuilder.headers(headers);
- }
- }
- return httpResponseBuilder.build();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- STATUS_CODE_CODER.verifyDeterministic();
- if (includeHeaders) {
- HEADERS_ENCODER.verifyDeterministic();
- }
- }
- }
-}
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
deleted file mode 100644
index e23a1a5e763..00000000000
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/** Defines common coders for Amazon Web Services. */
-package org.apache.beam.sdk.io.aws2.coders;
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
deleted file mode 100644
index 57acd46f615..00000000000
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponse;
-import static
org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponseWithoutHeaders;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.aws2.coders.AwsCoders;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-
-/**
- * Coders for SNS {@link PublishResponse}.
- *
- * @deprecated Schema based coder is inferred automatically.
- */
-@Deprecated
-public class PublishResponseCoders {
- private static final Coder<String> MESSAGE_ID_CODER = StringUtf8Coder.of();
- private static final NullableCoder<AwsResponseMetadata> METADATA_CODER =
- NullableCoder.of(AwsCoders.awsResponseMetadata());
-
- private PublishResponseCoders() {}
-
- /**
- * Returns a new SNS {@link PublishResponse} coder which by default
serializes only the SNS
- * messageId.
- *
- * @return the {@link PublishResponse} coder
- */
- public static Coder<PublishResponse> defaultPublishResponse() {
- return new PublishResponseCoder(null, null);
- }
-
- /**
- * Returns a new SNS {@link PublishResponse} coder which serializes {@link
AwsResponseMetadata}
- * and {@link SdkHttpResponse}, including the HTTP response headers.
- *
- * @return the {@link PublishResponse} coder
- */
- public static Coder<PublishResponse> fullPublishResponse() {
- return new PublishResponseCoder(METADATA_CODER,
NullableCoder.of(sdkHttpResponse()));
- }
-
- /**
- * Returns a new SNS {@link PublishResponse} coder which serializes {@link
AwsResponseMetadata}
- * and {@link SdkHttpResponse}, but not including the HTTP response headers.
- *
- * @return the {@link PublishResponse} coder
- */
- public static Coder<PublishResponse> fullPublishResponseWithoutHeaders() {
- return new PublishResponseCoder(
- METADATA_CODER, NullableCoder.of(sdkHttpResponseWithoutHeaders()));
- }
-
- private static class PublishResponseCoder extends
CustomCoder<PublishResponse> {
- private final @Nullable NullableCoder<AwsResponseMetadata> metadataCoder;
- private final @Nullable NullableCoder<SdkHttpResponse> httpResponseCoder;
-
- private PublishResponseCoder(
- @Nullable NullableCoder<AwsResponseMetadata> responseMetadataEncoder,
- @Nullable NullableCoder<SdkHttpResponse> sdkHttpMetadataCoder) {
- this.metadataCoder = responseMetadataEncoder;
- this.httpResponseCoder = sdkHttpMetadataCoder;
- }
-
- @Override
- public void encode(PublishResponse value, OutputStream outStream)
- throws CoderException, IOException {
- MESSAGE_ID_CODER.encode(value.messageId(), outStream);
- if (metadataCoder != null) {
- metadataCoder.encode(value.responseMetadata(), outStream);
- }
- if (httpResponseCoder != null) {
- httpResponseCoder.encode(value.sdkHttpResponse(), outStream);
- }
- }
-
- @Override
- public PublishResponse decode(InputStream inStream) throws CoderException,
IOException {
- PublishResponse.Builder responseBuilder =
-
PublishResponse.builder().messageId(MESSAGE_ID_CODER.decode(inStream));
- if (metadataCoder != null) {
- AwsResponseMetadata metadata = metadataCoder.decode(inStream);
- if (metadata != null) {
- responseBuilder.responseMetadata(metadata);
- }
- }
- if (httpResponseCoder != null) {
- SdkHttpResponse httpResponse = httpResponseCoder.decode(inStream);
- if (httpResponse != null) {
- responseBuilder.sdkHttpResponse(httpResponse);
- }
- }
- return responseBuilder.build();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- MESSAGE_ID_CODER.verifyDeterministic();
- if (metadataCoder != null) {
- metadataCoder.verifyDeterministic();
- }
- if (httpResponseCoder != null) {
- httpResponseCoder.verifyDeterministic();
- }
- }
- }
-}
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
index 7b9982517b8..182b7dbb388 100644
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
+++
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
@@ -21,11 +21,9 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.auto.value.AutoValue;
import java.util.function.Consumer;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
-import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -38,9 +36,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.InvalidParameterException;
@@ -69,12 +65,6 @@ import
software.amazon.awssdk.services.sns.model.PublishResponse;
* <li>Request builder function to create SNS publish requests from your
input
* </ul>
*
- * <p>By default, the output {@link PublishResponse} contains only the SNS
messageId, all other
- * fields are null. If you need to include the full {@link SdkHttpResponse}
and {@link
- * AwsResponseMetadata}, you can call {@link Write#withFullPublishResponse()}.
If you need the HTTP
- * status code only but no headers, you can use {@link
- * Write#withFullPublishResponseWithoutHeaders()}.
- *
* <h3>Configuration of AWS clients</h3>
*
* <p>AWS clients for all AWS IOs can be configured using {@link AwsOptions},
e.g. {@code
@@ -121,8 +111,6 @@ public final class SnsIO {
abstract @Nullable SerializableFunction<T, PublishRequest.Builder>
getPublishRequestBuilder();
- abstract @Nullable Coder<PublishResponse> getCoder();
-
abstract Builder<T> builder();
@AutoValue.Builder
@@ -135,8 +123,6 @@ public final class SnsIO {
abstract Builder<T> setPublishRequestBuilder(
SerializableFunction<T, PublishRequest.Builder> requestBuilder);
- abstract Builder<T> setCoder(Coder<PublishResponse> coder);
-
abstract Write<T> build();
}
@@ -177,39 +163,6 @@ public final class SnsIO {
return builder().setClientConfiguration(config).build();
}
- /**
- * Encode the full {@link PublishResponse} object, including
sdkResponseMetadata and
- * sdkHttpMetadata with the HTTP response headers.
- *
- * @deprecated Writes fail exceptionally in case of errors, there is no
need to check headers.
- */
- @Deprecated
- public Write<T> withFullPublishResponse() {
- return withCoder(PublishResponseCoders.fullPublishResponse());
- }
-
- /**
- * Encode the full {@link PublishResponse} object, including
sdkResponseMetadata and
- * sdkHttpMetadata but excluding the HTTP response headers.
- *
- * @deprecated Writes fail exceptionally in case of errors, there is no
need to check headers.
- */
- @Deprecated
- public Write<T> withFullPublishResponseWithoutHeaders() {
- return
withCoder(PublishResponseCoders.fullPublishResponseWithoutHeaders());
- }
-
- /**
- * Encode the {@link PublishResponse} with the given coder.
- *
- * @deprecated Explicit usage of coders is deprecated. Inferred schemas
provided by {@link
- * AwsSchemaProvider} will be used instead.
- */
- @Deprecated
- public Write<T> withCoder(Coder<PublishResponse> coder) {
- return builder().setCoder(coder).build();
- }
-
@Override
public PCollection<PublishResponse> expand(PCollection<T> input) {
checkArgument(getPublishRequestBuilder() != null,
"withPublishRequestBuilder() is required");
@@ -221,11 +174,7 @@ public final class SnsIO {
checkArgument(checkTopicExists(awsOptions), "Topic arn %s does not
exist", getTopicArn());
}
- PCollection<PublishResponse> result = input.apply(ParDo.of(new
SnsWriterFn<>(this)));
- if (getCoder() != null) {
- result.setCoder(getCoder());
- }
- return result;
+ return input.apply(ParDo.of(new SnsWriterFn<>(this)));
}
private boolean checkTopicExists(AwsOptions options) {
diff --git
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
deleted file mode 100644
index dd5ea454c34..00000000000
---
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.coders;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.junit.Test;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/** Tests for {@link AwsCoders}. */
-public class AwsCodersTest {
-
- @Test
- public void testAwsResponseMetadataDecodeEncodeEquals() throws Exception {
- AwsResponseMetadata value = buildAwsResponseMetadata();
- AwsResponseMetadata clone =
CoderUtils.clone(AwsCoders.awsResponseMetadata(), value);
- assertThat(clone.requestId(), equalTo(value.requestId()));
- }
-
- @Test
- public void testSdkHttpMetadataDecodeEncodeEquals() throws Exception {
- SdkHttpResponse value = buildSdkHttpMetadata();
- SdkHttpResponse clone = CoderUtils.clone(AwsCoders.sdkHttpResponse(),
value);
- assertThat(clone.statusCode(), equalTo(value.statusCode()));
- assertThat(clone.headers(), equalTo(value.headers()));
- }
-
- @Test
- public void testSdkHttpMetadataWithoutHeadersDecodeEncodeEquals() throws
Exception {
- SdkHttpResponse value = buildSdkHttpMetadata();
- SdkHttpResponse clone =
CoderUtils.clone(AwsCoders.sdkHttpResponseWithoutHeaders(), value);
- assertThat(clone.statusCode(), equalTo(value.statusCode()));
- assertThat(clone.headers().isEmpty(), equalTo(true));
- }
-
- private AwsResponseMetadata buildAwsResponseMetadata() {
- Map<String, String> metadata = ImmutableMap.of(AWS_REQUEST_ID,
UUID.randomUUID().toString());
- return new AwsResponseMetadata(metadata) {};
- }
-
- private SdkHttpResponse buildSdkHttpMetadata() {
- return SdkHttpResponse.builder()
- .statusCode(200)
- .appendHeader("Content-Type", "application/json")
- .build();
- }
-}
diff --git
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
deleted file mode 100644
index 4e21f885b21..00000000000
---
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse;
-import static
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.fullPublishResponseWithoutHeaders;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.junit.Test;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/** Tests for {@link PublishResponseCoders}. */
-public class PublishResponseCodersTest {
-
- @Test
- public void testDefaultPublishResponseDecodeEncodeEquals() throws Exception {
- CoderProperties.coderDecodeEncodeEqual(
- defaultPublishResponse(),
-
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
- }
-
- @Test
- public void testFullPublishResponseWithoutHeadersDecodeEncodeEquals() throws
Exception {
- CoderProperties.coderDecodeEncodeEqual(
- fullPublishResponseWithoutHeaders(),
-
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
-
- PublishResponse value = buildFullPublishResponse();
- PublishResponse clone =
CoderUtils.clone(fullPublishResponseWithoutHeaders(), value);
- assertThat(clone.responseMetadata().requestId(),
equalTo(value.responseMetadata().requestId()));
- assertThat(clone.sdkHttpResponse().statusCode(),
equalTo(value.sdkHttpResponse().statusCode()));
- assertThat(clone.sdkHttpResponse().headers().isEmpty(), equalTo(true));
- }
-
- @Test
- public void testFullPublishResponseIncludingHeadersDecodeEncodeEquals()
throws Exception {
- CoderProperties.coderDecodeEncodeEqual(
- PublishResponseCoders.fullPublishResponse(),
-
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
-
- PublishResponse value = buildFullPublishResponse();
- PublishResponse clone =
CoderUtils.clone(PublishResponseCoders.fullPublishResponse(), value);
- assertThat(clone.responseMetadata().requestId(),
equalTo(value.responseMetadata().requestId()));
- assertThat(clone.sdkHttpResponse().statusCode(),
equalTo(value.sdkHttpResponse().statusCode()));
- assertThat(clone.sdkHttpResponse().headers(),
equalTo(value.sdkHttpResponse().headers()));
- }
-
- private PublishResponse buildFullPublishResponse() {
- return (PublishResponse)
- PublishResponse.builder()
- .messageId(UUID.randomUUID().toString())
- .responseMetadata(buildAwsResponseMetadata())
- .sdkHttpResponse(buildSdkHttpMetadata())
- .build();
- }
-
- private AwsResponseMetadata buildAwsResponseMetadata() {
- Map<String, String> metadata = ImmutableMap.of(AWS_REQUEST_ID,
UUID.randomUUID().toString());
- return new AwsResponseMetadata(metadata) {};
- }
-
- private SdkHttpResponse buildSdkHttpMetadata() {
- return SdkHttpResponse.builder()
- .statusCode(200)
- .appendHeader("Content-Type", "application/json")
- .build();
- }
-}
diff --git
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
index 1eb40569577..59fde7f532b 100644
---
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
+++
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.io.aws2.sns;
-import static
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -29,8 +27,6 @@ import static org.mockito.Mockito.when;
import java.io.Serializable;
import java.util.List;
import java.util.function.Consumer;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.sns.SnsIO.Write;
import org.apache.beam.sdk.testing.PAssert;
@@ -137,41 +133,6 @@ public class SnsIOTest implements Serializable {
}
}
- @Test
- public void testWriteWithCustomCoder() {
- List<String> input = ImmutableList.of("message1");
-
- when(sns.publish(any(PublishRequest.class)))
- .thenReturn(PublishResponse.builder().messageId("id").build());
-
- // Mockito mocks cause NotSerializableException even with
withSettings().serializable()
- final CountingFn<PublishResponse> countingFn = new CountingFn<>();
-
- Write<String> snsWrite =
- SnsIO.<String>write()
- .withPublishRequestBuilder(msg -> requestBuilder(msg, topicArn))
- .withCoder(DelegateCoder.of(defaultPublishResponse(), countingFn,
x -> x));
-
- PCollection<PublishResponse> results =
p.apply(Create.of(input)).apply(snsWrite);
- PAssert.that(results.apply(Count.globally())).containsInAnyOrder(1L);
- p.run();
-
- assertThat(countingFn.count).isGreaterThan(0);
- for (String msg : input) {
- verify(sns).publish(requestBuilder(msg, topicArn).build());
- }
- }
-
- private static class CountingFn<T> implements CodingFunction<T, T> {
- int count;
-
- @Override
- public T apply(T input) throws Exception {
- count++;
- return input;
- }
- }
-
private static PublishRequest.Builder requestBuilder(String msg, String
topic) {
return PublishRequest.builder().message(msg).topicArn(topic);
}