piotr-szuberski commented on a change in pull request #12422: URL: https://github.com/apache/beam/pull/12422#discussion_r463559333
########## File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java ########## @@ -34,34 +35,65 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.testcontainers.containers.localstack.LocalStackContainer; /** * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link * KinesisTestOptions} in order to run this. */ @RunWith(JUnit4.class) public class KinesisIOIT implements Serializable { - private static int numberOfShards; - private static int numberOfRows; + private static final String STREAM_NAME = "beam_kinesis"; + private static final int NUM_RECORDS = 1000; @Rule public TestPipeline pipelineWrite = TestPipeline.create(); @Rule public TestPipeline pipelineRead = TestPipeline.create(); - private static KinesisTestOptions options; - private static final Instant now = Instant.now(); + static { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + } + + private final LocalStackContainer localstackContainer = + new LocalStackContainer("0.11.3") + .withServices(LocalStackContainer.Service.KINESIS) + .withEnv("USE_SSL", "true") + .withStartupAttempts(3); + + private String endpoint; + private String region; + private String accessKey; + private String secretKey; + + @Before + public void setup() throws Exception { + localstackContainer.start(); + endpoint = + localstackContainer + .getEndpointConfiguration(LocalStackContainer.Service.KINESIS) + .getServiceEndpoint() + .replace("http", "https"); + region = + localstackContainer + .getEndpointConfiguration(LocalStackContainer.Service.KINESIS) + .getSigningRegion(); + accessKey = + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId(); + secretKey = + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey(); + + createStream(); + } - @BeforeClass - public static void setup() { - PipelineOptionsFactory.register(KinesisTestOptions.class); - options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); - numberOfShards = options.getNumberOfShards(); - numberOfRows = options.getNumberOfRecords(); Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org