This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 48cac9b09a3 Remove deprecated AWS coders where replaced by schemas 
(closes #23315) (#26715)
48cac9b09a3 is described below

commit 48cac9b09a30bc32b9935b728af6ad3a79bb4435
Author: Moritz Mack <[email protected]>
AuthorDate: Tue May 16 20:01:17 2023 +0200

    Remove deprecated AWS coders where replaced by schemas (closes #23315) 
(#26715)
---
 CHANGES.md                                         |   1 +
 .../apache/beam/sdk/io/aws2/coders/AwsCoders.java  | 142 ---------------------
 .../beam/sdk/io/aws2/coders/package-info.java      |  19 ---
 .../sdk/io/aws2/sns/PublishResponseCoders.java     | 134 -------------------
 .../org/apache/beam/sdk/io/aws2/sns/SnsIO.java     |  53 +-------
 .../beam/sdk/io/aws2/coders/AwsCodersTest.java     |  69 ----------
 .../sdk/io/aws2/sns/PublishResponseCodersTest.java |  92 -------------
 .../org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java |  39 ------
 8 files changed, 2 insertions(+), 547 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 4c0cb3ce3c2..5774be7d646 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -79,6 +79,7 @@
 * CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow 
runner as the Google Cloud Debugger service is [shutting 
down](https://cloud.google.com/debugger/docs/deprecations). (Java) 
([#25959](https://github.com/apache/beam/issues/25959)).
 * AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) 
are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).
 * AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss) 
was finally removed ([#26710](https://github.com/apache/beam/issues/26710)).
+* AWS 2 coders (deprecated in Beam v2.43.0 when adding Schema support for AWS 
Sdk Pojos) are finally removed 
([#23315](https://github.com/apache/beam/issues/23315)).
 
 ## Deprecations
 
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
deleted file mode 100644
index a15818875c2..00000000000
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.coders;
-
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/**
- * {@link Coder}s for common AWS SDK objects.
- *
- * @deprecated {@link org.apache.beam.sdk.schemas.SchemaCoder SchemaCoders} 
for {@link
- *     software.amazon.awssdk.core.SdkPojo AWS model classes} will be 
automatically inferred by
- *     means of {@link org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider 
AwsSchemaProvider}.
- */
-@Deprecated
-public final class AwsCoders {
-
-  private AwsCoders() {}
-
-  /**
-   * Returns a new coder for {@link AwsResponseMetadata} (AWS request ID only).
-   *
-   * @return the {@link AwsResponseMetadata} coder
-   */
-  public static Coder<AwsResponseMetadata> awsResponseMetadata() {
-    return new AwsResponseMetadataCoder();
-  }
-
-  /**
-   * Returns a new coder for {@link SdkHttpResponse} (HTTP status code and 
headers).
-   *
-   * @return the SdkHttpResponse coder
-   */
-  public static Coder<SdkHttpResponse> sdkHttpResponse() {
-    return new SdkHttpResponseCoder(true);
-  }
-
-  /**
-   * Returns a new coder for {@link SdkHttpResponse} (HTTP status code only).
-   *
-   * @return the SdkHttpResponse coder
-   */
-  public static Coder<SdkHttpResponse> sdkHttpResponseWithoutHeaders() {
-    return new SdkHttpResponseCoder(false);
-  }
-
-  private static class AwsResponseMetadataCoder extends 
AtomicCoder<AwsResponseMetadata> {
-    private static final Coder<String> REQUEST_ID_CODER = StringUtf8Coder.of();
-
-    private static class DecodedAwsResponseMetadata extends 
AwsResponseMetadata {
-      protected DecodedAwsResponseMetadata(String requestId) {
-        super(ImmutableMap.of(AWS_REQUEST_ID, requestId));
-      }
-    }
-
-    @Override
-    public void encode(AwsResponseMetadata value, OutputStream outStream)
-        throws CoderException, IOException {
-      REQUEST_ID_CODER.encode(value.requestId(), outStream);
-    }
-
-    @Override
-    public AwsResponseMetadata decode(InputStream inStream) throws 
CoderException, IOException {
-      return new DecodedAwsResponseMetadata(REQUEST_ID_CODER.decode(inStream));
-    }
-  }
-
-  private static class SdkHttpResponseCoder extends 
CustomCoder<SdkHttpResponse> {
-    private static final Coder<Integer> STATUS_CODE_CODER = VarIntCoder.of();
-    private static final Coder<Map<String, List<String>>> HEADERS_ENCODER =
-        NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), 
ListCoder.of(StringUtf8Coder.of())));
-
-    private final boolean includeHeaders;
-
-    protected SdkHttpResponseCoder(boolean includeHeaders) {
-      this.includeHeaders = includeHeaders;
-    }
-
-    @Override
-    public void encode(SdkHttpResponse value, OutputStream outStream)
-        throws CoderException, IOException {
-      STATUS_CODE_CODER.encode(value.statusCode(), outStream);
-      if (includeHeaders) {
-        HEADERS_ENCODER.encode(value.headers(), outStream);
-      }
-    }
-
-    @Override
-    public SdkHttpResponse decode(InputStream inStream) throws CoderException, 
IOException {
-      SdkHttpResponse.Builder httpResponseBuilder =
-          
SdkHttpResponse.builder().statusCode(STATUS_CODE_CODER.decode(inStream));
-
-      if (includeHeaders) {
-        Map<String, List<String>> headers = HEADERS_ENCODER.decode(inStream);
-        if (headers != null) {
-          httpResponseBuilder.headers(headers);
-        }
-      }
-      return httpResponseBuilder.build();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      STATUS_CODE_CODER.verifyDeterministic();
-      if (includeHeaders) {
-        HEADERS_ENCODER.verifyDeterministic();
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
deleted file mode 100644
index e23a1a5e763..00000000000
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/** Defines common coders for Amazon Web Services. */
-package org.apache.beam.sdk.io.aws2.coders;
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
deleted file mode 100644
index 57acd46f615..00000000000
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponse;
-import static 
org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponseWithoutHeaders;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.aws2.coders.AwsCoders;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-
-/**
- * Coders for SNS {@link PublishResponse}.
- *
- * @deprecated Schema based coder is inferred automatically.
- */
-@Deprecated
-public class PublishResponseCoders {
-  private static final Coder<String> MESSAGE_ID_CODER = StringUtf8Coder.of();
-  private static final NullableCoder<AwsResponseMetadata> METADATA_CODER =
-      NullableCoder.of(AwsCoders.awsResponseMetadata());
-
-  private PublishResponseCoders() {}
-
-  /**
-   * Returns a new SNS {@link PublishResponse} coder which by default 
serializes only the SNS
-   * messageId.
-   *
-   * @return the {@link PublishResponse} coder
-   */
-  public static Coder<PublishResponse> defaultPublishResponse() {
-    return new PublishResponseCoder(null, null);
-  }
-
-  /**
-   * Returns a new SNS {@link PublishResponse} coder which serializes {@link 
AwsResponseMetadata}
-   * and {@link SdkHttpResponse}, including the HTTP response headers.
-   *
-   * @return the {@link PublishResponse} coder
-   */
-  public static Coder<PublishResponse> fullPublishResponse() {
-    return new PublishResponseCoder(METADATA_CODER, 
NullableCoder.of(sdkHttpResponse()));
-  }
-
-  /**
-   * Returns a new SNS {@link PublishResponse} coder which serializes {@link 
AwsResponseMetadata}
-   * and {@link SdkHttpResponse}, but not including the HTTP response headers.
-   *
-   * @return the {@link PublishResponse} coder
-   */
-  public static Coder<PublishResponse> fullPublishResponseWithoutHeaders() {
-    return new PublishResponseCoder(
-        METADATA_CODER, NullableCoder.of(sdkHttpResponseWithoutHeaders()));
-  }
-
-  private static class PublishResponseCoder extends 
CustomCoder<PublishResponse> {
-    private final @Nullable NullableCoder<AwsResponseMetadata> metadataCoder;
-    private final @Nullable NullableCoder<SdkHttpResponse> httpResponseCoder;
-
-    private PublishResponseCoder(
-        @Nullable NullableCoder<AwsResponseMetadata> responseMetadataEncoder,
-        @Nullable NullableCoder<SdkHttpResponse> sdkHttpMetadataCoder) {
-      this.metadataCoder = responseMetadataEncoder;
-      this.httpResponseCoder = sdkHttpMetadataCoder;
-    }
-
-    @Override
-    public void encode(PublishResponse value, OutputStream outStream)
-        throws CoderException, IOException {
-      MESSAGE_ID_CODER.encode(value.messageId(), outStream);
-      if (metadataCoder != null) {
-        metadataCoder.encode(value.responseMetadata(), outStream);
-      }
-      if (httpResponseCoder != null) {
-        httpResponseCoder.encode(value.sdkHttpResponse(), outStream);
-      }
-    }
-
-    @Override
-    public PublishResponse decode(InputStream inStream) throws CoderException, 
IOException {
-      PublishResponse.Builder responseBuilder =
-          
PublishResponse.builder().messageId(MESSAGE_ID_CODER.decode(inStream));
-      if (metadataCoder != null) {
-        AwsResponseMetadata metadata = metadataCoder.decode(inStream);
-        if (metadata != null) {
-          responseBuilder.responseMetadata(metadata);
-        }
-      }
-      if (httpResponseCoder != null) {
-        SdkHttpResponse httpResponse = httpResponseCoder.decode(inStream);
-        if (httpResponse != null) {
-          responseBuilder.sdkHttpResponse(httpResponse);
-        }
-      }
-      return responseBuilder.build();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      MESSAGE_ID_CODER.verifyDeterministic();
-      if (metadataCoder != null) {
-        metadataCoder.verifyDeterministic();
-      }
-      if (httpResponseCoder != null) {
-        httpResponseCoder.verifyDeterministic();
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
index 7b9982517b8..182b7dbb388 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
@@ -21,11 +21,9 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.auto.value.AutoValue;
 import java.util.function.Consumer;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
 import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
 import org.apache.beam.sdk.io.aws2.options.AwsOptions;
-import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -38,9 +36,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
 import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.http.SdkHttpResponse;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.sns.SnsClient;
 import software.amazon.awssdk.services.sns.model.InvalidParameterException;
@@ -69,12 +65,6 @@ import 
software.amazon.awssdk.services.sns.model.PublishResponse;
  *   <li>Request builder function to create SNS publish requests from your 
input
  * </ul>
  *
- * <p>By default, the output {@link PublishResponse} contains only the SNS 
messageId, all other
- * fields are null. If you need to include the full {@link SdkHttpResponse} 
and {@link
- * AwsResponseMetadata}, you can call {@link Write#withFullPublishResponse()}. 
If you need the HTTP
- * status code only but no headers, you can use {@link
- * Write#withFullPublishResponseWithoutHeaders()}.
- *
  * <h3>Configuration of AWS clients</h3>
  *
  * <p>AWS clients for all AWS IOs can be configured using {@link AwsOptions}, 
e.g. {@code
@@ -121,8 +111,6 @@ public final class SnsIO {
 
     abstract @Nullable SerializableFunction<T, PublishRequest.Builder> 
getPublishRequestBuilder();
 
-    abstract @Nullable Coder<PublishResponse> getCoder();
-
     abstract Builder<T> builder();
 
     @AutoValue.Builder
@@ -135,8 +123,6 @@ public final class SnsIO {
       abstract Builder<T> setPublishRequestBuilder(
           SerializableFunction<T, PublishRequest.Builder> requestBuilder);
 
-      abstract Builder<T> setCoder(Coder<PublishResponse> coder);
-
       abstract Write<T> build();
     }
 
@@ -177,39 +163,6 @@ public final class SnsIO {
       return builder().setClientConfiguration(config).build();
     }
 
-    /**
-     * Encode the full {@link PublishResponse} object, including 
sdkResponseMetadata and
-     * sdkHttpMetadata with the HTTP response headers.
-     *
-     * @deprecated Writes fail exceptionally in case of errors, there is no 
need to check headers.
-     */
-    @Deprecated
-    public Write<T> withFullPublishResponse() {
-      return withCoder(PublishResponseCoders.fullPublishResponse());
-    }
-
-    /**
-     * Encode the full {@link PublishResponse} object, including 
sdkResponseMetadata and
-     * sdkHttpMetadata but excluding the HTTP response headers.
-     *
-     * @deprecated Writes fail exceptionally in case of errors, there is no 
need to check headers.
-     */
-    @Deprecated
-    public Write<T> withFullPublishResponseWithoutHeaders() {
-      return 
withCoder(PublishResponseCoders.fullPublishResponseWithoutHeaders());
-    }
-
-    /**
-     * Encode the {@link PublishResponse} with the given coder.
-     *
-     * @deprecated Explicit usage of coders is deprecated. Inferred schemas 
provided by {@link
-     *     AwsSchemaProvider} will be used instead.
-     */
-    @Deprecated
-    public Write<T> withCoder(Coder<PublishResponse> coder) {
-      return builder().setCoder(coder).build();
-    }
-
     @Override
     public PCollection<PublishResponse> expand(PCollection<T> input) {
       checkArgument(getPublishRequestBuilder() != null, 
"withPublishRequestBuilder() is required");
@@ -221,11 +174,7 @@ public final class SnsIO {
         checkArgument(checkTopicExists(awsOptions), "Topic arn %s does not 
exist", getTopicArn());
       }
 
-      PCollection<PublishResponse> result = input.apply(ParDo.of(new 
SnsWriterFn<>(this)));
-      if (getCoder() != null) {
-        result.setCoder(getCoder());
-      }
-      return result;
+      return input.apply(ParDo.of(new SnsWriterFn<>(this)));
     }
 
     private boolean checkTopicExists(AwsOptions options) {
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
deleted file mode 100644
index dd5ea454c34..00000000000
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.coders;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.junit.Test;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/** Tests for {@link AwsCoders}. */
-public class AwsCodersTest {
-
-  @Test
-  public void testAwsResponseMetadataDecodeEncodeEquals() throws Exception {
-    AwsResponseMetadata value = buildAwsResponseMetadata();
-    AwsResponseMetadata clone = 
CoderUtils.clone(AwsCoders.awsResponseMetadata(), value);
-    assertThat(clone.requestId(), equalTo(value.requestId()));
-  }
-
-  @Test
-  public void testSdkHttpMetadataDecodeEncodeEquals() throws Exception {
-    SdkHttpResponse value = buildSdkHttpMetadata();
-    SdkHttpResponse clone = CoderUtils.clone(AwsCoders.sdkHttpResponse(), 
value);
-    assertThat(clone.statusCode(), equalTo(value.statusCode()));
-    assertThat(clone.headers(), equalTo(value.headers()));
-  }
-
-  @Test
-  public void testSdkHttpMetadataWithoutHeadersDecodeEncodeEquals() throws 
Exception {
-    SdkHttpResponse value = buildSdkHttpMetadata();
-    SdkHttpResponse clone = 
CoderUtils.clone(AwsCoders.sdkHttpResponseWithoutHeaders(), value);
-    assertThat(clone.statusCode(), equalTo(value.statusCode()));
-    assertThat(clone.headers().isEmpty(), equalTo(true));
-  }
-
-  private AwsResponseMetadata buildAwsResponseMetadata() {
-    Map<String, String> metadata = ImmutableMap.of(AWS_REQUEST_ID, 
UUID.randomUUID().toString());
-    return new AwsResponseMetadata(metadata) {};
-  }
-
-  private SdkHttpResponse buildSdkHttpMetadata() {
-    return SdkHttpResponse.builder()
-        .statusCode(200)
-        .appendHeader("Content-Type", "application/json")
-        .build();
-  }
-}
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
deleted file mode 100644
index 4e21f885b21..00000000000
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static 
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse;
-import static 
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.fullPublishResponseWithoutHeaders;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;
-
-import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.junit.Test;
-import software.amazon.awssdk.awscore.AwsResponseMetadata;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-import software.amazon.awssdk.utils.ImmutableMap;
-
-/** Tests for {@link PublishResponseCoders}. */
-public class PublishResponseCodersTest {
-
-  @Test
-  public void testDefaultPublishResponseDecodeEncodeEquals() throws Exception {
-    CoderProperties.coderDecodeEncodeEqual(
-        defaultPublishResponse(),
-        
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
-  }
-
-  @Test
-  public void testFullPublishResponseWithoutHeadersDecodeEncodeEquals() throws 
Exception {
-    CoderProperties.coderDecodeEncodeEqual(
-        fullPublishResponseWithoutHeaders(),
-        
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
-
-    PublishResponse value = buildFullPublishResponse();
-    PublishResponse clone = 
CoderUtils.clone(fullPublishResponseWithoutHeaders(), value);
-    assertThat(clone.responseMetadata().requestId(), 
equalTo(value.responseMetadata().requestId()));
-    assertThat(clone.sdkHttpResponse().statusCode(), 
equalTo(value.sdkHttpResponse().statusCode()));
-    assertThat(clone.sdkHttpResponse().headers().isEmpty(), equalTo(true));
-  }
-
-  @Test
-  public void testFullPublishResponseIncludingHeadersDecodeEncodeEquals() 
throws Exception {
-    CoderProperties.coderDecodeEncodeEqual(
-        PublishResponseCoders.fullPublishResponse(),
-        
PublishResponse.builder().messageId(UUID.randomUUID().toString()).build());
-
-    PublishResponse value = buildFullPublishResponse();
-    PublishResponse clone = 
CoderUtils.clone(PublishResponseCoders.fullPublishResponse(), value);
-    assertThat(clone.responseMetadata().requestId(), 
equalTo(value.responseMetadata().requestId()));
-    assertThat(clone.sdkHttpResponse().statusCode(), 
equalTo(value.sdkHttpResponse().statusCode()));
-    assertThat(clone.sdkHttpResponse().headers(), 
equalTo(value.sdkHttpResponse().headers()));
-  }
-
-  private PublishResponse buildFullPublishResponse() {
-    return (PublishResponse)
-        PublishResponse.builder()
-            .messageId(UUID.randomUUID().toString())
-            .responseMetadata(buildAwsResponseMetadata())
-            .sdkHttpResponse(buildSdkHttpMetadata())
-            .build();
-  }
-
-  private AwsResponseMetadata buildAwsResponseMetadata() {
-    Map<String, String> metadata = ImmutableMap.of(AWS_REQUEST_ID, 
UUID.randomUUID().toString());
-    return new AwsResponseMetadata(metadata) {};
-  }
-
-  private SdkHttpResponse buildSdkHttpMetadata() {
-    return SdkHttpResponse.builder()
-        .statusCode(200)
-        .appendHeader("Content-Type", "application/json")
-        .build();
-  }
-}
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
index 1eb40569577..59fde7f532b 100644
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
+++ 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io.aws2.sns;
 
-import static 
org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -29,8 +27,6 @@ import static org.mockito.Mockito.when;
 import java.io.Serializable;
 import java.util.List;
 import java.util.function.Consumer;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
 import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
 import org.apache.beam.sdk.io.aws2.sns.SnsIO.Write;
 import org.apache.beam.sdk.testing.PAssert;
@@ -137,41 +133,6 @@ public class SnsIOTest implements Serializable {
     }
   }
 
-  @Test
-  public void testWriteWithCustomCoder() {
-    List<String> input = ImmutableList.of("message1");
-
-    when(sns.publish(any(PublishRequest.class)))
-        .thenReturn(PublishResponse.builder().messageId("id").build());
-
-    // Mockito mocks cause NotSerializableException even with 
withSettings().serializable()
-    final CountingFn<PublishResponse> countingFn = new CountingFn<>();
-
-    Write<String> snsWrite =
-        SnsIO.<String>write()
-            .withPublishRequestBuilder(msg -> requestBuilder(msg, topicArn))
-            .withCoder(DelegateCoder.of(defaultPublishResponse(), countingFn, 
x -> x));
-
-    PCollection<PublishResponse> results = 
p.apply(Create.of(input)).apply(snsWrite);
-    PAssert.that(results.apply(Count.globally())).containsInAnyOrder(1L);
-    p.run();
-
-    assertThat(countingFn.count).isGreaterThan(0);
-    for (String msg : input) {
-      verify(sns).publish(requestBuilder(msg, topicArn).build());
-    }
-  }
-
-  private static class CountingFn<T> implements CodingFunction<T, T> {
-    int count;
-
-    @Override
-    public T apply(T input) throws Exception {
-      count++;
-      return input;
-    }
-  }
-
   private static PublishRequest.Builder requestBuilder(String msg, String 
topic) {
     return PublishRequest.builder().message(msg).topicArn(topic);
   }

Reply via email to