This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c7117a3 Inject AmazonS3 into S3ManagedLedgerOffloader (#1755) c7117a3 is described below commit c7117a3f045d8f33beca20de926d6641e99424e8 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Thu May 10 08:14:52 2018 +0200 Inject AmazonS3 into S3ManagedLedgerOffloader (#1755) Rather than creating it directly in the offloader, inject the AmazonS3 object so that it can be mocked for testing. Master Issue: #1511 --- .../org/apache/pulsar/broker/PulsarService.java | 2 +- .../broker/s3offload/S3ManagedLedgerOffloader.java | 14 ++++++++---- .../s3offload/S3ManagedLedgerOffloaderTest.java | 26 +++++++++------------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 7b975f3..5f6dbca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -653,7 +653,7 @@ public class PulsarService implements AutoCloseable { throws PulsarServerException { if (conf.getManagedLedgerOffloadDriver() != null && conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) { - return new S3ManagedLedgerOffloader(conf, getOffloaderScheduler(conf)); + return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); } else { return NullLedgerOffloader.INSTANCE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index c11fb3d..163b79b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -40,8 +40,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { private final AmazonS3 s3client; private final String bucket; - public S3ManagedLedgerOffloader(ServiceConfiguration conf, - ScheduledExecutorService scheduler) + public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, + ScheduledExecutorService scheduler) throws PulsarServerException { String region = conf.getS3ManagedLedgerOffloadRegion(); String bucket = conf.getS3ManagedLedgerOffloadBucket(); @@ -53,12 +53,18 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty is s3 offload enabled"); } - AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard().withRegion(region); + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); if (!Strings.isNullOrEmpty(endpoint)) { builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region)); builder.setPathStyleAccessEnabled(true); + } else { + builder.setRegion(region); } - s3client = builder.build(); + return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler); + } + + S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, ScheduledExecutorService scheduler) { + this.s3client = s3client; this.bucket = bucket; this.scheduler = scheduler; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index 4291a2c..deeacd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -51,7 +51,8 @@ class S3ManagedLedgerOffloaderTest { final ScheduledExecutorService scheduler; final MockBookKeeper bk; S3Mock s3mock = null; - String endpoint = null; + AmazonS3 s3client = null; + String s3endpoint = null; final static String REGION = "foobar"; final static String BUCKET = "foobar"; @@ -65,13 +66,13 @@ class S3ManagedLedgerOffloaderTest { public void start() throws Exception { s3mock = new S3Mock.Builder().withPort(0).withInMemoryBackend().build(); int port = s3mock.start().localAddress().getPort(); - endpoint = "http://localhost:" + port; + s3endpoint = "http://localhost:" + port; - AmazonS3 client = AmazonS3ClientBuilder.standard() + s3client = AmazonS3ClientBuilder.standard() .withRegion(REGION) - .withEndpointConfiguration(new EndpointConfiguration(endpoint, REGION)) + .withEndpointConfiguration(new EndpointConfiguration(s3endpoint, REGION)) .withPathStyleAccessEnabled(true).build(); - client.createBucket(BUCKET); + s3client.createBucket(BUCKET); } @AfterMethod @@ -93,12 +94,7 @@ class S3ManagedLedgerOffloaderTest { @Test public void testHappyCase() throws Exception { - ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); - conf.setS3ManagedLedgerOffloadBucket(BUCKET); - conf.setS3ManagedLedgerOffloadRegion(REGION); - conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler); offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); } @@ -108,9 +104,9 @@ class S3ManagedLedgerOffloaderTest { ServiceConfiguration conf = new ServiceConfiguration(); conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); conf.setS3ManagedLedgerOffloadBucket("no-bucket"); + conf.setS3ManagedLedgerOffloadServiceEndpoint(s3endpoint); conf.setS3ManagedLedgerOffloadRegion(REGION); - conf.setS3ManagedLedgerOffloadServiceEndpoint(endpoint); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(conf, scheduler); + LedgerOffloader offloader = S3ManagedLedgerOffloader.create(conf, scheduler); try { offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); @@ -127,7 +123,7 @@ class S3ManagedLedgerOffloaderTest { conf.setS3ManagedLedgerOffloadBucket(BUCKET); try { - new S3ManagedLedgerOffloader(conf, scheduler); + S3ManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -141,7 +137,7 @@ class S3ManagedLedgerOffloaderTest { conf.setS3ManagedLedgerOffloadRegion(REGION); try { - new S3ManagedLedgerOffloader(conf, scheduler); + S3ManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct -- To stop receiving notification emails like this one, please contact si...@apache.org.