This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-infra.git
The following commit(s) were added to refs/heads/master by this push: new 3e941c5 AMBARI-24731 - Infra Manager: scheduled cleanup of old jobs (#4) 3e941c5 is described below commit 3e941c5e3442ef7a5c7cbdc61b987e4654224827 Author: kasakrisz <33458261+kasakr...@users.noreply.github.com> AuthorDate: Fri Oct 12 10:05:52 2018 +0200 AMBARI-24731 - Infra Manager: scheduled cleanup of old jobs (#4) --- .gitignore | 1 + ambari-infra-manager-it/pom.xml | 6 +- .../java/org/apache/ambari/infra/S3Client.java | 99 ++++++++++++++++ .../test/java/org/apache/ambari/infra/Solr.java | 2 +- .../ambari/infra/steps/AbstractInfraSteps.java | 29 +++-- .../apache/ambari/infra/steps/ExportJobsSteps.java | 57 +++------ .../test/resources/stories/infra_api_tests.story | 6 +- ambari-infra-manager/.gitignore | 3 +- ambari-infra-manager/docker/docker-compose.yml | 7 +- .../docker/infra-manager-docker-compose.sh | 4 +- ambari-infra-manager/pom.xml | 11 +- .../DurationConverter.java} | 36 ++---- .../infra/job/AbstractJobsConfiguration.java | 13 ++- .../ambari/infra/job/InfraJobExecutionDao.java | 75 ++++++++++++ .../org/apache/ambari/infra/job/JobProperties.java | 30 +---- ...bsPropertyMap.java => JobPropertiesHolder.java} | 25 ++-- .../org/apache/ambari/infra/job/JobScheduler.java | 21 ++-- .../apache/ambari/infra/job/JobsPropertyMap.java | 35 +++--- .../ambari/infra/job/SchedulingProperties.java | 9 -- ...{SchedulingProperties.java => Validatable.java} | 30 +---- ...ingProperties.java => ArchivingParameters.java} | 97 ++++++++-------- .../archive/DocumentArchivingConfiguration.java | 50 ++++---- .../job/archive/DocumentArchivingProperties.java | 98 ++++++---------- .../infra/job/archive/FileNameSuffixFormatter.java | 8 +- .../ambari/infra/job/archive/S3Uploader.java | 70 +++++++---- .../apache/ambari/infra/job/archive/SolrDAO.java | 10 +- .../{SolrProperties.java => SolrParameters.java} | 26 +---- .../ambari/infra/job/archive/SolrProperties.java | 36 ++---- .../ambari/infra/job/archive/SolrQueryBuilder.java | 22 +++- .../infra/job/cleanup/CleanUpConfiguration.java | 79 +++++++++++++ .../CleanUpParameters.java} | 39 +++---- .../infra/job/cleanup/CleanUpProperties.java | 55 +++++++++ .../TaskHistoryWiper.java} | 41 ++++--- ...tingProperties.java => DeletingParameters.java} | 50 +++++--- .../deleting/DocumentDeletingConfiguration.java | 14 +-- .../job/deleting/DocumentDeletingProperties.java | 43 +++---- .../infra/job/deleting/DocumentWiperTasklet.java | 19 +-- .../DurationToStringConverter.java} | 33 ++---- .../StringToDurationConverter.java} | 33 ++---- .../src/main/resources/infra-manager.properties | 8 +- .../org/apache/ambari/infra/env/TestAppConfig.java | 128 +++++++++++++++++++++ .../ambari/infra/job/InfraJobExecutionDAOIT.java | 99 ++++++++++++++++ .../apache/ambari/infra/job/JobPropertiesTest.java | 56 --------- .../infra/job/archive/SolrPropertiesTest.java | 22 ++-- .../infra/job/archive/SolrQueryBuilderTest.java | 38 +++++- ambari-infra-solr-plugin/pom.xml | 5 + pom.xml | 7 +- 47 files changed, 1042 insertions(+), 643 deletions(-) diff --git a/.gitignore b/.gitignore index 9c77d11..2dcdfea 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ derby.log pass.txt out +job-repository.db \ No newline at end of file diff --git a/ambari-infra-manager-it/pom.xml b/ambari-infra-manager-it/pom.xml index 2fac6a9..68d4352 100644 --- a/ambari-infra-manager-it/pom.xml +++ b/ambari-infra-manager-it/pom.xml @@ -45,9 +45,9 @@ <version>${solr.version}</version> </dependency> <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.11.5</version> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + <version>5.0.1</version> </dependency> <dependency> <groupId>commons-io</groupId> diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java new file mode 100644 index 0000000..f0b592d --- /dev/null +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; + +import io.minio.MinioClient; +import io.minio.Result; +import io.minio.messages.Item; + +public class S3Client { + private final MinioClient s3client; + private final String bucket; + + public S3Client(String host, int port, String bucket) { + try { + s3client = new MinioClient(String.format("http://%s:%d", host, port), "remote-identity", "remote-credential"); + this.bucket = bucket; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public void putObject(String key, InputStream inputStream, long length) { + try { + s3client.putObject(bucket, key, inputStream, length, "application/octet-stream"); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public void putObject(String key, byte[] bytes) { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) { + putObject(key, inputStream, bytes.length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public List<String> listObjectKeys() { + try { + List<String> keys = new ArrayList<>(); + for (Result<Item> item : s3client.listObjects(bucket)) { + keys.add(item.get().objectName()); + } + return keys; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List<String> listObjectKeys(String text) { + try { + List<String> keys = new ArrayList<>(); + for (Result<Item> item : s3client.listObjects(bucket)) { + String objectName = item.get().objectName(); + if (objectName.contains(text)) + keys.add(objectName); + } + return keys; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void deleteObject(String key) { + try { + s3client.removeObject(bucket, key); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java index 1ffdb2a..0dcc91a 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java @@ -55,7 +55,7 @@ public class Solr { public Solr(String configSetPath) { this.configSetPath = configSetPath; - this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica1", + this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica_n1", getDockerHost(), SOLR_PORT, AUDIT_LOGS_COLLECTION)).build(); diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java index f219ce5..da962b9 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -29,8 +29,10 @@ import java.io.IOException; import java.net.URL; import java.time.OffsetDateTime; import java.util.Date; +import java.util.List; import org.apache.ambari.infra.InfraClient; +import org.apache.ambari.infra.S3Client; import org.apache.ambari.infra.Solr; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -44,11 +46,6 @@ import org.jbehave.core.annotations.BeforeStories; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; - public abstract class AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(AbstractInfraSteps.class); @@ -59,7 +56,7 @@ public abstract class AbstractInfraSteps { private String ambariFolder; private String shellScriptLocation; private String dockerHost; - private AmazonS3Client s3client; + private S3Client s3client; private int documentId = 0; private Solr solr; @@ -71,7 +68,7 @@ public abstract class AbstractInfraSteps { return solr; } - public AmazonS3Client getS3client() { + public S3Client getS3client() { return s3client; } @@ -86,8 +83,11 @@ public abstract class AbstractInfraSteps { URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation(); ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent(); - LOG.info("Clean local data folder {}", getLocalDataFolder()); - FileUtils.cleanDirectory(new File(getLocalDataFolder())); + String localDataFolder = getLocalDataFolder(); + if (new File(localDataFolder).exists()) { + LOG.info("Clean local data folder {}", localDataFolder); + FileUtils.cleanDirectory(new File(localDataFolder)); + } shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh"; LOG.info("Create new docker container for testing Ambari Infra Manager ..."); @@ -102,9 +102,7 @@ public abstract class AbstractInfraSteps { solr.createSolrCollection(HADOOP_LOGS_COLLECTION); LOG.info("Initializing s3 client"); - s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential")); - s3client.setEndpoint(String.format("http://%s:%d", dockerHost, FAKE_S3_PORT)); - s3client.createBucket(S3_BUCKET_NAME); + s3client = new S3Client(dockerHost, FAKE_S3_PORT, S3_BUCKET_NAME); checkInfraManagerReachable(); } @@ -155,10 +153,9 @@ public abstract class AbstractInfraSteps { @AfterStories public void shutdownContainers() throws Exception { Thread.sleep(2000); // sync with s3 server - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - ObjectListing objectListing = getS3client().listObjects(listObjectsRequest); - LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size()); - objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file on s3 with key {}", s3ObjectSummary.getKey())); + List<String> objectKeys = getS3client().listObjectKeys(); + LOG.info("Found {} files on s3.", objectKeys.size()); + objectKeys.forEach(objectKey -> LOG.info("Found file on s3 with key {}", objectKey)); LOG.info("Listing files on hdfs."); try (FileSystem fileSystem = getHdfs()) { diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java index d84c23f..b1d36d1 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -23,11 +23,9 @@ import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORM import static org.apache.ambari.infra.TestUtil.doWithin; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.junit.Assert.assertThat; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -35,10 +33,12 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.ambari.infra.InfraClient; import org.apache.ambari.infra.JobExecutionInfo; +import org.apache.ambari.infra.S3Client; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -50,11 +50,6 @@ import org.jbehave.core.annotations.When; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; - public class ExportJobsSteps extends AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); @@ -80,9 +75,7 @@ public class ExportJobsSteps extends AbstractInfraSteps { @Given("a file on s3 with key $key") public void addFileToS3(String key) throws Exception { - try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) { - getS3client().putObject(S3_BUCKET_NAME, key, inputStream, new ObjectMetadata()); - } + getS3client().putObject(key, "anything".getBytes()); } @When("start $jobName job") @@ -113,10 +106,8 @@ public class ExportJobsSteps extends AbstractInfraSteps { @When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds") public void stopJob(String jobName, int count, String text, int waitSec) throws Exception { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && fileCountOnS3(text, s3Client, listObjectsRequest) > count); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() > count); try (InfraClient httpClient = getInfraClient()) { httpClient.stopJob(launchedJobs.get(jobName).getExecutionId()); @@ -125,40 +116,29 @@ public class ExportJobsSteps extends AbstractInfraSteps { @When("delete file with key $key from s3") public void deleteFileFromS3(String key) { - getS3client().deleteObject(S3_BUCKET_NAME, key); + getS3client().deleteObject(key); } @Then("Check filenames contains the text $text on s3 server after $waitSec seconds") public void checkS3After(String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && !s3Client.listObjects(listObjectsRequest).getObjectSummaries().isEmpty()); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> !s3Client.listObjectKeys().isEmpty()); - ObjectListing objectListing = s3Client.listObjects(listObjectsRequest); - assertThat(objectListing.getObjectSummaries(), hasItem(hasProperty("key", containsString(text)))); + List<String> objectKeys = s3Client.listObjectKeys(text); + assertThat(objectKeys, hasItem(containsString(text))); } @Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") public void checkNumberOfFilesOnS3(long count, String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && fileCountOnS3(text, s3Client, listObjectsRequest) == count); - } - - private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) { - return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() - .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)) - .count(); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() == count); } @Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between( - fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L)); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> between( + s3Client.listObjectKeys(text).size(), 1L, count - 1L)); } private boolean between(long count, long from, long to) { @@ -167,10 +147,9 @@ public class ExportJobsSteps extends AbstractInfraSteps { @Then("No file exists on s3 server with filenames containing the text $text") public void fileNotExistOnS3(String text) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - assertThat(s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() - .anyMatch(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)), is(false)); + S3Client s3Client = getS3client(); + assertThat(s3Client.listObjectKeys().stream() + .anyMatch(objectKey -> objectKey.contains(text)), is(false)); } @Then("solr contains $count documents between $startLogtime and $endLogtime") diff --git a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story index 122a634..876019f 100644 --- a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story +++ b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -59,9 +59,9 @@ And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02 Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted. -Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z +Given 500 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds -Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +Then Less than 20 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds When restart archive_audit_logs job within 10 seconds -Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +Then Check 25 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds diff --git a/ambari-infra-manager/.gitignore b/ambari-infra-manager/.gitignore index 94b3829..4aece78 100644 --- a/ambari-infra-manager/.gitignore +++ b/ambari-infra-manager/.gitignore @@ -2,4 +2,5 @@ out/* *.pid Profile .env -test-out \ No newline at end of file +test-out +test.db \ No newline at end of file diff --git a/ambari-infra-manager/docker/docker-compose.yml b/ambari-infra-manager/docker/docker-compose.yml index c031cd7..3fa21b2 100644 --- a/ambari-infra-manager/docker/docker-compose.yml +++ b/ambari-infra-manager/docker/docker-compose.yml @@ -45,7 +45,7 @@ services: - "-z" - ${ZOOKEEPER_CONNECTION_STRING} volumes: - - $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/configsets:/opt/solr/configsets fakes3: image: localstack/localstack hostname: fakes3 @@ -53,6 +53,7 @@ services: - "4569:4569" environment: - SERVICES=s3:4569 + - DEBUG=s3 networks: infra-network: aliases: @@ -96,8 +97,8 @@ services: ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING} DISPLAY: $DOCKERIP:0 volumes: - - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager - - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/docker/test-out:/root/archive + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/target/package:/root/ambari-infra-manager + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/test-out:/root/archive networks: infra-network: driver: bridge diff --git a/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra-manager/docker/infra-manager-docker-compose.sh index 531440d..0c18e6f 100755 --- a/ambari-infra-manager/docker/infra-manager-docker-compose.sh +++ b/ambari-infra-manager/docker/infra-manager-docker-compose.sh @@ -61,13 +61,13 @@ function check_env_file() { function setup_env() { pushd $sdir/../../ - local AMBARI_LOCATION=$(pwd) + local AMBARI_INFRA_LOCATION=$(pwd) popd local docker_ip=$(get_docker_ip) cat << EOF > $sdir/.env DOCKERIP=$docker_ip MAVEN_REPOSITORY_LOCATION=$HOME/.m2 -AMBARI_LOCATION=$AMBARI_LOCATION +AMBARI_INFRA_LOCATION=$AMBARI_INFRA_LOCATION ZOOKEEPER_VERSION=3.4.10 ZOOKEEPER_CONNECTION_STRING=zookeeper:2181 diff --git a/ambari-infra-manager/pom.xml b/ambari-infra-manager/pom.xml index d25440f..f28f59d 100644 --- a/ambari-infra-manager/pom.xml +++ b/ambari-infra-manager/pom.xml @@ -134,7 +134,6 @@ <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> - <version>1.3</version> <scope>test</scope> </dependency> <!-- Spring dependencies --> @@ -445,11 +444,6 @@ <version>20.0</version> </dependency> <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.11.5</version> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.5</version> @@ -460,6 +454,11 @@ <version>${spring-boot.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + <version>5.0.1</version> + </dependency> </dependencies> </project> diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java similarity index 58% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java index af81b4f..60915de 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java @@ -16,34 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra.conf; -public class SchedulingProperties { - private boolean enabled = false; - private String cron; - private String intervalEndDelta; +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } +import java.time.Duration; - public String getCron() { - return cron; - } +import javax.inject.Named; - public void setCron(String cron) { - this.cron = cron; - } - - public String getIntervalEndDelta() { - return intervalEndDelta; - } +import org.springframework.boot.context.properties.ConfigurationPropertiesBinding; +import org.springframework.core.convert.converter.Converter; - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; +@Named +@ConfigurationPropertiesBinding +public class DurationConverter implements Converter<String, Duration> { + @Override + public Duration convert(String s) { + return toDuration(s); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java index 02a6885..314e52e 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra.job; +import java.util.Map; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; @@ -27,18 +31,15 @@ import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; -import javax.annotation.PostConstruct; -import java.util.Map; - -public abstract class AbstractJobsConfiguration<T extends JobProperties<T>> { +public abstract class AbstractJobsConfiguration<TProperties extends JobProperties<TParameters>, TParameters extends Validatable> { private static final Logger LOG = LoggerFactory.getLogger(AbstractJobsConfiguration.class); - private final Map<String, T> propertyMap; + private final Map<String, TProperties> propertyMap; private final JobScheduler scheduler; private final JobBuilderFactory jobs; private final JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; - protected AbstractJobsConfiguration(Map<String, T> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { + protected AbstractJobsConfiguration(Map<String, TProperties> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { this.propertyMap = propertyMap; this.scheduler = scheduler; this.jobs = jobs; diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java new file mode 100644 index 0000000..903639c --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import java.time.OffsetDateTime; +import java.util.Date; + +import javax.inject.Inject; + +import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.support.TransactionTemplate; + +@Repository +public class InfraJobExecutionDao extends AbstractJdbcBatchMetadataDao { + + private final TransactionTemplate transactionTemplate; + + @Inject + public InfraJobExecutionDao(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) { + setJdbcTemplate(jdbcTemplate); + this.transactionTemplate = transactionTemplate; + } + + public void deleteJobExecutions(OffsetDateTime olderThan) { + transactionTemplate.execute(transactionStatus -> { + Date olderThanDate = Date.from(olderThan.toInstant()); + deleteStepExecutionContexts(olderThanDate); + deleteStepExecutions(olderThanDate); + deleteJobExecutionParams(olderThanDate); + deleteJobExecutionContexts(olderThanDate); + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?"), olderThanDate); + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_INSTANCE WHERE JOB_INSTANCE_ID NOT IN (SELECT JOB_INSTANCE_ID FROM %PREFIX%JOB_EXECUTION)")); + return null; + }); + } + + private void deleteStepExecutionContexts(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%STEP_EXECUTION_CONTEXT WHERE STEP_EXECUTION_ID IN (SELECT STEP_EXECUTION_ID FROM %PREFIX%STEP_EXECUTION WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?))"), + olderThan); + } + + private void deleteStepExecutions(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%STEP_EXECUTION WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + + private void deleteJobExecutionParams(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + + private void deleteJobExecutionContexts(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java index 79406d0..7be152f 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java @@ -18,23 +18,15 @@ */ package org.apache.ambari.infra.job; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.batch.core.JobParameters; - -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Optional; -public abstract class JobProperties<T extends JobProperties<T>> { +import org.springframework.batch.core.JobParameters; + +public abstract class JobProperties<TParameters extends Validatable> { private SchedulingProperties scheduling; - private final Class<T> clazz; private boolean enabled; - protected JobProperties(Class<T> clazz) { - this.clazz = clazz; - } - public SchedulingProperties getScheduling() { return scheduling; } @@ -49,23 +41,11 @@ public abstract class JobProperties<T extends JobProperties<T>> { this.scheduling = scheduling; } - public T deepCopy() { - try { - ObjectMapper objectMapper = new ObjectMapper(); - String json = objectMapper.writeValueAsString(this); - return objectMapper.readValue(json, clazz); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - public abstract void apply(JobParameters jobParameters); - - public abstract void validate(); + public abstract TParameters merge(JobParameters jobParameters); public void validate(String jobName) { try { - validate(); + merge(new JobParameters()).validate(); } catch (Exception ex) { throw new JobConfigurationException(String.format("Configuration of job %s is invalid: %s!", jobName, ex.getMessage()), ex); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java similarity index 66% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java index 094e797..67cdafa 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java @@ -18,32 +18,27 @@ */ package org.apache.ambari.infra.job; +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; -import java.util.Map; - -public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener { +public class JobPropertiesHolder<T extends Validatable> + implements JobExecutionListener { - private final Map<String, T> propertyMap; + private final JobProperties<T> defaultProperties; - public JobsPropertyMap(Map<String, T> propertyMap) { - this.propertyMap = propertyMap; + public JobPropertiesHolder(JobProperties<T> defaultProperties) { + this.defaultProperties = defaultProperties; } @Override public void beforeJob(JobExecution jobExecution) { try { - String jobName = jobExecution.getJobInstance().getJobName(); - T defaultProperties = propertyMap.get(jobName); - if (defaultProperties == null) - throw new UnsupportedOperationException("Properties not found for job " + jobName); - - T properties = defaultProperties.deepCopy(); - properties.apply(jobExecution.getJobParameters()); - properties.validate(jobName); - jobExecution.getExecutionContext().put("jobProperties", properties); + T parameters = defaultProperties.merge(jobExecution.getJobParameters()); + parameters.validate(); + jobExecution.getExecutionContext().put(PARAMETERS_CONTEXT_KEY, parameters); } catch (UnsupportedOperationException | IllegalArgumentException ex) { jobExecution.stop(); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java index 324c0b3..8729c4e 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java @@ -18,6 +18,11 @@ */ package org.apache.ambari.infra.job; +import java.util.Date; + +import javax.inject.Inject; +import javax.inject.Named; + import org.apache.ambari.infra.manager.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,14 +38,6 @@ import org.springframework.batch.core.repository.JobRestartException; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; -import javax.inject.Inject; -import javax.inject.Named; -import java.time.Duration; -import java.time.OffsetDateTime; - -import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER; -import static org.apache.commons.lang.StringUtils.isBlank; - @Named public class JobScheduler { private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class); @@ -61,7 +58,7 @@ public class JobScheduler { throw new RuntimeException(e); } - scheduler.schedule(() -> launchJob(jobName, schedulingProperties.getIntervalEndDelta()), new CronTrigger(schedulingProperties.getCron())); + scheduler.schedule(() -> launchJob(jobName), new CronTrigger(schedulingProperties.getCron())); LOG.info("Job {} scheduled for running. Cron: {}", jobName, schedulingProperties.getCron()); } @@ -75,12 +72,10 @@ public class JobScheduler { } } - private void launchJob(String jobName, String endDelta) { + private void launchJob(String jobName) { try { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); - if (!isBlank(endDelta)) - jobParametersBuilder.addString("end", SOLR_DATETIME_FORMATTER.format(OffsetDateTime.now().minus(Duration.parse(endDelta)))); - + jobParametersBuilder.addDate("scheduledLaunchAt", new Date()); jobs.launchJob(jobName, jobParametersBuilder.toJobParameters()); } catch (JobParametersInvalidException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) { throw new RuntimeException(e); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java index 094e797..0eb5908 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java @@ -18,38 +18,29 @@ */ package org.apache.ambari.infra.job; -import org.springframework.batch.core.ExitStatus; +import java.util.Map; + import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; -import java.util.Map; +public class JobsPropertyMap<TProperties extends JobProperties<TParameters>, TParameters extends Validatable> + implements JobExecutionListener { -public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener { + public static final String PARAMETERS_CONTEXT_KEY = "jobParameters"; + private final Map<String, TProperties> propertyMap; - private final Map<String, T> propertyMap; - - public JobsPropertyMap(Map<String, T> propertyMap) { + public JobsPropertyMap(Map<String, TProperties> propertyMap) { this.propertyMap = propertyMap; } @Override public void beforeJob(JobExecution jobExecution) { - try { - String jobName = jobExecution.getJobInstance().getJobName(); - T defaultProperties = propertyMap.get(jobName); - if (defaultProperties == null) - throw new UnsupportedOperationException("Properties not found for job " + jobName); - - T properties = defaultProperties.deepCopy(); - properties.apply(jobExecution.getJobParameters()); - properties.validate(jobName); - jobExecution.getExecutionContext().put("jobProperties", properties); - } - catch (UnsupportedOperationException | IllegalArgumentException ex) { - jobExecution.stop(); - jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage())); - throw ex; - } + String jobName = jobExecution.getJobInstance().getJobName(); + TProperties defaultProperties = propertyMap.get(jobName); + if (defaultProperties == null) + throw new UnsupportedOperationException("Properties not found for job " + jobName); + + new JobPropertiesHolder<>(defaultProperties).beforeJob(jobExecution); } @Override diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java index af81b4f..2f18c55 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java @@ -21,7 +21,6 @@ package org.apache.ambari.infra.job; public class SchedulingProperties { private boolean enabled = false; private String cron; - private String intervalEndDelta; public boolean isEnabled() { return enabled; @@ -38,12 +37,4 @@ public class SchedulingProperties { public void setCron(String cron) { this.cron = cron; } - - public String getIntervalEndDelta() { - return intervalEndDelta; - } - - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; - } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java similarity index 60% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java index af81b4f..5c04406 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java @@ -18,32 +18,6 @@ */ package org.apache.ambari.infra.job; -public class SchedulingProperties { - private boolean enabled = false; - private String cron; - private String intervalEndDelta; - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public String getCron() { - return cron; - } - - public void setCron(String cron) { - this.cron = cron; - } - - public String getIntervalEndDelta() { - return intervalEndDelta; - } - - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; - } +public interface Validatable { + void validate(); } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java similarity index 77% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java index b26da36..eea87d2 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java @@ -18,36 +18,41 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.ambari.infra.job.JobProperties; -import org.springframework.batch.core.JobParameters; - -import java.util.Optional; - import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.job.archive.ExportDestination.HDFS; import static org.apache.ambari.infra.job.archive.ExportDestination.LOCAL; import static org.apache.ambari.infra.job.archive.ExportDestination.S3; import static org.apache.commons.lang.StringUtils.isBlank; -public class DocumentArchivingProperties extends JobProperties<DocumentArchivingProperties> { +import java.time.Duration; +import java.util.Optional; + +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +public class ArchivingParameters implements Validatable { private int readBlockSize; private int writeBlockSize; private ExportDestination destination; private String localDestinationDirectory; private String fileNameSuffixColumn; private String fileNameSuffixDateFormat; - private SolrProperties solr; + private SolrParameters solr; private String s3AccessFile; private String s3KeyPrefix; private String s3BucketName; private String s3Endpoint; - private String hdfsEndpoint; private String hdfsDestinationDirectory; - - public DocumentArchivingProperties() { - super(DocumentArchivingProperties.class); - } + private String start; + private String end; + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; public int getReadBlockSize() { return readBlockSize; @@ -97,12 +102,12 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving this.fileNameSuffixDateFormat = fileNameSuffixDateFormat; } - public SolrProperties getSolr() { + public SolrParameters getSolr() { return solr; } - public void setSolr(SolrProperties query) { - this.solr = query; + public void setSolr(SolrParameters solr) { + this.solr = solr; } public String getS3AccessFile() { @@ -137,6 +142,22 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving this.s3Endpoint = s3Endpoint; } + public String getHdfsEndpoint() { + return hdfsEndpoint; + } + + public void setHdfsEndpoint(String hdfsEndpoint) { + this.hdfsEndpoint = hdfsEndpoint; + } + + public String getHdfsDestinationDirectory() { + return hdfsDestinationDirectory; + } + + public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) { + this.hdfsDestinationDirectory = hdfsDestinationDirectory; + } + public Optional<S3Properties> s3Properties() { if (isBlank(s3BucketName)) return Optional.empty(); @@ -148,50 +169,36 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving s3Endpoint)); } - public String getHdfsEndpoint() { - return hdfsEndpoint; + public String getStart() { + return start; } - public void setHdfsEndpoint(String hdfsEndpoint) { - this.hdfsEndpoint = hdfsEndpoint; + public void setStart(String start) { + this.start = start; } - public String getHdfsDestinationDirectory() { - return hdfsDestinationDirectory; + public String getEnd() { + return end; } - public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) { - this.hdfsDestinationDirectory = hdfsDestinationDirectory; + public void setEnd(String end) { + this.end = end; } - @Override - public void apply(JobParameters jobParameters) { - readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); - writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); - destination = ExportDestination.valueOf(jobParameters.getString("destination", destination.name())); - localDestinationDirectory = jobParameters.getString("localDestinationDirectory", localDestinationDirectory); - s3AccessFile = jobParameters.getString("s3AccessFile", s3AccessFile); - s3BucketName = jobParameters.getString("s3BucketName", s3BucketName); - s3KeyPrefix = jobParameters.getString("s3KeyPrefix", s3KeyPrefix); - s3Endpoint = jobParameters.getString("s3Endpoint", s3Endpoint); - hdfsEndpoint = jobParameters.getString("hdfsEndpoint", hdfsEndpoint); - hdfsDestinationDirectory = jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory); - solr.apply(jobParameters); - } - - private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) { - String valueText = jobParameters.getString(parameterName); - if (isBlank(valueText)) - return defaultValue; - return Integer.parseInt(valueText); + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; } @Override public void validate() { - if (readBlockSize == 0) + if (readBlockSize <= 0) throw new IllegalArgumentException("The property readBlockSize must be greater than 0!"); - if (writeBlockSize == 0) + if (writeBlockSize <= 0) throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); if (isBlank(fileNameSuffixColumn)) { diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java index 8358dd0..eac31af 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java @@ -18,6 +18,14 @@ */ package org.apache.ambari.infra.job.archive; +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; +import static org.apache.commons.lang.StringUtils.isBlank; + +import java.io.File; + +import javax.inject.Inject; + import org.apache.ambari.infra.conf.InfraManagerDataConfig; import org.apache.ambari.infra.conf.security.PasswordStore; import org.apache.ambari.infra.job.AbstractJobsConfiguration; @@ -40,13 +48,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Inject; -import java.io.File; - -import static org.apache.commons.lang.StringUtils.isBlank; - @Configuration -public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties> { +public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties, ArchivingParameters> { private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class); private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { }; @@ -83,7 +86,7 @@ public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<Do @StepScope public DocumentExporter documentExporter(DocumentItemReader documentItemReader, @Value("#{stepExecution.jobExecution.jobId}") String jobId, - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties, + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, InfraManagerDataConfig infraManagerDataConfig, @Value("#{jobParameters[end]}") String intervalEnd, DocumentWiper documentWiper, @@ -92,28 +95,28 @@ public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<Do File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting"); CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); - switch (properties.getDestination()) { + switch (parameters.getDestination()) { case S3: fileAction.add(new S3Uploader( - properties.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")), + parameters.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")), passwordStore)); break; case HDFS: org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set("fs.defaultFS", properties.getHdfsEndpoint()); - fileAction.add(new HdfsUploader(conf, new Path(properties.getHdfsDestinationDirectory()))); + conf.set("fs.defaultFS", parameters.getHdfsEndpoint()); + fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()))); break; case LOCAL: - baseDir = new File(properties.getLocalDestinationDirectory()); + baseDir = new File(parameters.getLocalDestinationDirectory()); break; } - FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(properties); + FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(parameters); LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper); File destinationDirectory = new File( baseDir, String.format("%s_%s_%s", - properties.getSolr().getCollection(), + parameters.getSolr().getCollection(), jobId, isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd))); LOG.info("Destination directory path={}", destinationDirectory); @@ -126,23 +129,23 @@ public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<Do return new DocumentExporter( documentItemReader, firstDocument -> new LocalDocumentItemWriter( - outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), - properties.getWriteBlockSize(), jobContextRepository); + outFile(parameters.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), + parameters.getWriteBlockSize(), jobContextRepository); } @Bean @StepScope - public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties, + public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, SolrDAO solrDAO) { - if (isBlank(properties.getSolr().getDeleteQueryText())) + if (isBlank(parameters.getSolr().getDeleteQueryText())) return NOT_DELETE; return solrDAO; } @Bean @StepScope - public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) { - return new SolrDAO(properties.getSolr()); + public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters) { + return new SolrDAO(parameters.getSolr()); } private File outFile(String collection, File directoryPath, String suffix) { @@ -154,16 +157,15 @@ public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<Do @Bean @StepScope public DocumentItemReader reader(ObjectSource<Document> documentSource, - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) { + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters properties) { return new DocumentItemReader(documentSource, properties.getReadBlockSize()); } @Bean @StepScope - public ObjectSource<Document> logSource(@Value("#{jobParameters[start]}") String start, - @Value("#{jobParameters[end]}") String end, + public ObjectSource<Document> logSource(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, SolrDAO solrDAO) { - return new SolrDocumentSource(solrDAO, start, end); + return new SolrDocumentSource(solrDAO, parameters.getStart(), computeEnd(parameters.getEnd(), parameters.getTtl())); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java index b26da36..dea8acb 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java @@ -18,24 +18,24 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.ambari.infra.job.JobProperties; -import org.springframework.batch.core.JobParameters; +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; +import static org.apache.commons.lang.StringUtils.isBlank; +import java.time.Duration; import java.util.Optional; -import static java.util.Objects.requireNonNull; -import static org.apache.ambari.infra.job.archive.ExportDestination.HDFS; -import static org.apache.ambari.infra.job.archive.ExportDestination.LOCAL; -import static org.apache.ambari.infra.job.archive.ExportDestination.S3; -import static org.apache.commons.lang.StringUtils.isBlank; +import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.springframework.batch.core.JobParameters; -public class DocumentArchivingProperties extends JobProperties<DocumentArchivingProperties> { +public class DocumentArchivingProperties extends JobProperties<ArchivingParameters> { private int readBlockSize; private int writeBlockSize; private ExportDestination destination; private String localDestinationDirectory; private String fileNameSuffixColumn; private String fileNameSuffixDateFormat; + private Duration ttl; private SolrProperties solr; private String s3AccessFile; private String s3KeyPrefix; @@ -45,10 +45,6 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving private String hdfsEndpoint; private String hdfsDestinationDirectory; - public DocumentArchivingProperties() { - super(DocumentArchivingProperties.class); - } - public int getReadBlockSize() { return readBlockSize; } @@ -97,6 +93,14 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving this.fileNameSuffixDateFormat = fileNameSuffixDateFormat; } + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + public SolrProperties getSolr() { return solr; } @@ -164,21 +168,6 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving this.hdfsDestinationDirectory = hdfsDestinationDirectory; } - @Override - public void apply(JobParameters jobParameters) { - readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); - writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); - destination = ExportDestination.valueOf(jobParameters.getString("destination", destination.name())); - localDestinationDirectory = jobParameters.getString("localDestinationDirectory", localDestinationDirectory); - s3AccessFile = jobParameters.getString("s3AccessFile", s3AccessFile); - s3BucketName = jobParameters.getString("s3BucketName", s3BucketName); - s3KeyPrefix = jobParameters.getString("s3KeyPrefix", s3KeyPrefix); - s3Endpoint = jobParameters.getString("s3Endpoint", s3Endpoint); - hdfsEndpoint = jobParameters.getString("hdfsEndpoint", hdfsEndpoint); - hdfsDestinationDirectory = jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory); - solr.apply(jobParameters); - } - private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) { String valueText = jobParameters.getString(parameterName); if (isBlank(valueText)) @@ -187,41 +176,24 @@ public class DocumentArchivingProperties extends JobProperties<DocumentArchiving } @Override - public void validate() { - if (readBlockSize == 0) - throw new IllegalArgumentException("The property readBlockSize must be greater than 0!"); - - if (writeBlockSize == 0) - throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); - - if (isBlank(fileNameSuffixColumn)) { - throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!"); - } - - requireNonNull(destination, "The property destination can not be null!"); - switch (destination) { - case LOCAL: - if (isBlank(localDestinationDirectory)) - throw new IllegalArgumentException(String.format( - "The property localDestinationDirectory can not be null or empty string when destination is set to %s!", LOCAL.name())); - break; - - case S3: - s3Properties() - .orElseThrow(() -> new IllegalArgumentException("S3 related properties must be set if the destination is " + S3.name())) - .validate(); - break; - - case HDFS: - if (isBlank(hdfsEndpoint)) - throw new IllegalArgumentException(String.format( - "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name())); - if (isBlank(hdfsDestinationDirectory)) - throw new IllegalArgumentException(String.format( - "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name())); - } - - requireNonNull(solr, "No solr query was specified for archiving job!"); - solr.validate(); + public ArchivingParameters merge(JobParameters jobParameters) { + ArchivingParameters archivingParameters = new ArchivingParameters(); + archivingParameters.setReadBlockSize(getIntJobParameter(jobParameters, "readBlockSize", readBlockSize)); + archivingParameters.setWriteBlockSize(getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize)); + archivingParameters.setDestination(ExportDestination.valueOf(jobParameters.getString("destination", destination.name()))); + archivingParameters.setLocalDestinationDirectory(jobParameters.getString("localDestinationDirectory", localDestinationDirectory)); + archivingParameters.setFileNameSuffixColumn(jobParameters.getString("fileNameSuffixColumn", fileNameSuffixColumn)); + archivingParameters.setFileNameSuffixDateFormat(jobParameters.getString("fileNameSuffixDateFormat", fileNameSuffixDateFormat)); + archivingParameters.setS3AccessFile(jobParameters.getString("s3AccessFile", s3AccessFile)); + archivingParameters.setS3BucketName(jobParameters.getString("s3BucketName", s3BucketName)); + archivingParameters.setS3KeyPrefix(jobParameters.getString("s3KeyPrefix", s3KeyPrefix)); + archivingParameters.setS3Endpoint(jobParameters.getString("s3Endpoint", s3Endpoint)); + archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint)); + archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory)); + archivingParameters.setSolr(solr.merge(jobParameters)); + archivingParameters.setStart(jobParameters.getString("start")); + archivingParameters.setEnd(jobParameters.getString("end")); + archivingParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return archivingParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java index f9016e6..0c879bd 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java @@ -18,17 +18,17 @@ */ package org.apache.ambari.infra.job.archive; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; - import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE_FORMAT_TEXT; import static org.apache.commons.lang.StringUtils.isBlank; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; + public class FileNameSuffixFormatter { public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT); - public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) { + public static FileNameSuffixFormatter from(ArchivingParameters properties) { return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat()); } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java index 2536cb5..76aa734 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java @@ -1,16 +1,30 @@ package org.apache.ambari.infra.job.archive; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; +import static org.apache.commons.lang.StringUtils.isNotBlank; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + import org.apache.ambari.infra.conf.security.CompositePasswordStore; import org.apache.ambari.infra.conf.security.PasswordStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xmlpull.v1.XmlPullParserException; -import java.io.File; - -import static org.apache.commons.lang.StringUtils.isBlank; -import static org.apache.commons.lang.StringUtils.isNotBlank; +import io.minio.MinioClient; +import io.minio.errors.ErrorResponseException; +import io.minio.errors.InsufficientDataException; +import io.minio.errors.InternalException; +import io.minio.errors.InvalidArgumentException; +import io.minio.errors.InvalidBucketNameException; +import io.minio.errors.InvalidEndpointException; +import io.minio.errors.InvalidPortException; +import io.minio.errors.NoResponseException; +import io.minio.errors.RegionConflictException; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -34,7 +48,7 @@ public class S3Uploader extends AbstractFileAction { private static final Logger LOG = LoggerFactory.getLogger(S3Uploader.class); - private final AmazonS3Client client; + private final MinioClient client; private final String keyPrefix; private final String bucketName; @@ -48,27 +62,39 @@ public class S3Uploader extends AbstractFileAction { if (isNotBlank((s3Properties.getS3AccessFile()))) compositePasswordStore = new CompositePasswordStore(passwordStore, S3AccessCsv.file(s3Properties.getS3AccessFile())); - BasicAWSCredentials credentials = new BasicAWSCredentials( - compositePasswordStore.getPassword(S3AccessKeyNames.AccessKeyId.getEnvVariableName()) - .orElseThrow(() -> new IllegalArgumentException("Access key Id is not present!")), - compositePasswordStore.getPassword(S3AccessKeyNames.SecretAccessKey.getEnvVariableName()) - .orElseThrow(() -> new IllegalArgumentException("Secret Access Key is not present!"))); - client = new AmazonS3Client(credentials); - if (!isBlank(s3Properties.getS3EndPoint())) - client.setEndpoint(s3Properties.getS3EndPoint()); -// Note: without pathStyleAccess=true endpoint going to be <bucketName>.<host>:<port> -// client.setS3ClientOptions(S3ClientOptions.builder().setPathStyleAccess(true).build()); + try { + client = new MinioClient(s3Properties.getS3EndPoint(), compositePasswordStore.getPassword(S3AccessKeyNames.AccessKeyId.getEnvVariableName()) + .orElseThrow(() -> new IllegalArgumentException("Access key Id is not present!")), + compositePasswordStore.getPassword(S3AccessKeyNames.SecretAccessKey.getEnvVariableName()) + .orElseThrow(() -> new IllegalArgumentException("Secret Access Key is not present!"))); + + if (!client.bucketExists(bucketName)) + client.makeBucket(bucketName); + + } catch (RegionConflictException | XmlPullParserException | InvalidBucketNameException | NoSuchAlgorithmException | InsufficientDataException | ErrorResponseException | InvalidKeyException | NoResponseException | InvalidPortException | InvalidEndpointException | InternalException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override public File onPerform(File inputFile) { String key = keyPrefix + inputFile.getName(); - if (client.doesObjectExist(bucketName, key)) { - throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName)); - } + try { + if (client.listObjects(bucketName, key).iterator().hasNext()) { + throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName)); + } - client.putObject(bucketName, key, inputFile); - return inputFile; + try (FileInputStream fileInputStream = new FileInputStream(inputFile)) { + client.putObject(bucketName, key, fileInputStream, inputFile.length(), "application/json"); + return inputFile; + } + } catch (InvalidKeyException | NoSuchAlgorithmException | NoResponseException | XmlPullParserException | InvalidArgumentException | InvalidBucketNameException | ErrorResponseException | InternalException | InsufficientDataException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java index fba08e7..7f8fd07 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java @@ -18,6 +18,9 @@ */ package org.apache.ambari.infra.job.archive; +import java.io.IOException; +import java.io.UncheckedIOException; + import org.apache.ambari.infra.job.SolrDAOBase; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; @@ -26,15 +29,12 @@ import org.apache.solr.client.solrj.response.QueryResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UncheckedIOException; - public class SolrDAO extends SolrDAOBase implements DocumentWiper { private static final Logger LOG = LoggerFactory.getLogger(SolrDAO.class); - private final SolrProperties queryProperties; + private final SolrParameters queryProperties; - public SolrDAO(SolrProperties queryProperties) { + public SolrDAO(SolrParameters queryProperties) { super(queryProperties.getZooKeeperConnectionString(), queryProperties.getCollection()); this.queryProperties = queryProperties; } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java similarity index 74% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java index a2a78c2..a793c9b 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java @@ -18,14 +18,9 @@ */ package org.apache.ambari.infra.job.archive; -import org.springframework.batch.core.JobParameters; - -import java.util.ArrayList; -import java.util.List; - import static org.apache.commons.lang.StringUtils.isBlank; -public class SolrProperties { +public class SolrParameters { private String zooKeeperConnectionString; private String collection; private String queryText; @@ -88,25 +83,6 @@ public class SolrProperties { .addSort(sortColumn); } - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - queryText = jobParameters.getString("queryText", queryText); - filterQueryText = jobParameters.getString("filterQueryText", filterQueryText); - deleteQueryText = jobParameters.getString("deleteQueryText", deleteQueryText); - - String sortValue; - List<String> sortColumns = new ArrayList<>(); - int i = 0; - while ((sortValue = jobParameters.getString(String.format("sortColumn[%d]", i))) != null) { - sortColumns.add(sortValue); - ++i; - } - - if (sortColumns.size() > 0) - sortColumn = sortColumns.toArray(new String[sortColumns.size()]); - } - public void validate() { if (isBlank(zooKeeperConnectionString)) throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java index a2a78c2..1cb2d62 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java @@ -18,12 +18,10 @@ */ package org.apache.ambari.infra.job.archive; -import org.springframework.batch.core.JobParameters; - import java.util.ArrayList; import java.util.List; -import static org.apache.commons.lang.StringUtils.isBlank; +import org.springframework.batch.core.JobParameters; public class SolrProperties { private String zooKeeperConnectionString; @@ -81,19 +79,13 @@ public class SolrProperties { this.deleteQueryText = deleteQueryText; } - public SolrQueryBuilder toQueryBuilder() { - return new SolrQueryBuilder(). - setQueryText(queryText) - .setFilterQueryText(filterQueryText) - .addSort(sortColumn); - } - - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - queryText = jobParameters.getString("queryText", queryText); - filterQueryText = jobParameters.getString("filterQueryText", filterQueryText); - deleteQueryText = jobParameters.getString("deleteQueryText", deleteQueryText); + public SolrParameters merge(JobParameters jobParameters) { + SolrParameters solrParameters = new SolrParameters(); + solrParameters.setZooKeeperConnectionString(jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString)); + solrParameters.setCollection(jobParameters.getString("collection", collection)); + solrParameters.setQueryText(jobParameters.getString("queryText", queryText)); + solrParameters.setFilterQueryText(jobParameters.getString("filterQueryText", filterQueryText)); + solrParameters.setDeleteQueryText(jobParameters.getString("deleteQueryText", deleteQueryText)); String sortValue; List<String> sortColumns = new ArrayList<>(); @@ -102,16 +94,8 @@ public class SolrProperties { sortColumns.add(sortValue); ++i; } + solrParameters.setSortColumn(sortColumns.toArray(new String[0])); - if (sortColumns.size() > 0) - sortColumn = sortColumns.toArray(new String[sortColumns.size()]); - } - - public void validate() { - if (isBlank(zooKeeperConnectionString)) - throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); - - if (isBlank(collection)) - throw new IllegalArgumentException("The property collection can not be null or empty string!"); + return solrParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java index 0e41169..40771dc 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java @@ -18,16 +18,32 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.solr.client.solrj.SolrQuery; +import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER; +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc; +import java.time.Duration; +import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Map; -import static org.apache.commons.lang.StringUtils.isBlank; -import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc; +import org.apache.solr.client.solrj.SolrQuery; public class SolrQueryBuilder { + public static String computeEnd(String end, Duration ttl) { + return computeEnd(end, OffsetDateTime.now(), ttl); + } + + public static String computeEnd(String end, OffsetDateTime now, Duration ttl) { + if (isNotBlank(end)) + return end; + if (ttl != null) + return SOLR_DATETIME_FORMATTER.format(now.minus(ttl)); + return null; + } + private static final String INTERVAL_START = "start"; private static final String INTERVAL_END = "end"; private String queryText; diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java new file mode 100644 index 0000000..27f61fa --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + +import javax.inject.Inject; + +import org.apache.ambari.infra.job.InfraJobExecutionDao; +import org.apache.ambari.infra.job.JobPropertiesHolder; +import org.apache.ambari.infra.job.JobScheduler; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; + +@Configuration +public class CleanUpConfiguration { + + public static final String JOB_NAME = "clean_up"; + private final StepBuilderFactory steps; + private final JobBuilderFactory jobs; + private final JobScheduler scheduler; + private final CleanUpProperties cleanUpProperties; + + @Inject + public CleanUpConfiguration(StepBuilderFactory steps, JobBuilderFactory jobs, CleanUpProperties cleanUpProperties, JobScheduler scheduler) { + this.steps = steps; + this.jobs = jobs; + this.scheduler = scheduler; + this.cleanUpProperties = cleanUpProperties; + } + + @EventListener(ApplicationReadyEvent.class) + public void scheduleJob() { + cleanUpProperties.scheduling().ifPresent(schedulingProperties -> scheduler.schedule(JOB_NAME, schedulingProperties)); + } + + @Bean(name = "cleanUpJob") + public Job job(@Qualifier("cleanUpStep") Step cleanUpStep) { + return jobs.get(JOB_NAME).listener(new JobPropertiesHolder<>(cleanUpProperties)).start(cleanUpStep).build(); + } + + @Bean(name = "cleanUpStep") + protected Step cleanUpStep(TaskHistoryWiper taskHistoryWiper) { + return steps.get("cleanUpStep").tasklet(taskHistoryWiper).build(); + } + + @Bean + @StepScope + protected TaskHistoryWiper taskHistoryWiper( + InfraJobExecutionDao infraJobExecutionDao, + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") CleanUpParameters cleanUpParameters) { + return new TaskHistoryWiper(infraJobExecutionDao, cleanUpParameters.getTtl()); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java similarity index 53% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java index af81b4f..a4f2141 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java @@ -16,34 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra.job.cleanup; -public class SchedulingProperties { - private boolean enabled = false; - private String cron; - private String intervalEndDelta; +import java.time.Duration; - public boolean isEnabled() { - return enabled; - } +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; - public String getCron() { - return cron; - } +public class CleanUpParameters implements Validatable { - public void setCron(String cron) { - this.cron = cron; + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; + + public Duration getTtl() { + return ttl; } - public String getIntervalEndDelta() { - return intervalEndDelta; + public void setTtl(Duration ttl) { + this.ttl = ttl; } - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; + @Override + public void validate() { + } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java new file mode 100644 index 0000000..7bf9808 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; + +import java.time.Duration; + +import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.springframework.batch.core.JobParameters; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "infra-manager.jobs.clean-up") +public class CleanUpProperties extends JobProperties<CleanUpParameters> { + + private Duration ttl; + + protected CleanUpProperties() { + setEnabled(true); + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + @Override + public CleanUpParameters merge(JobParameters jobParameters) { + CleanUpParameters cleanUpParameters = new CleanUpParameters(); + cleanUpParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return cleanUpParameters; + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java similarity index 54% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java index 463e6e0..594515e 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java @@ -16,34 +16,41 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job.deleting; +package org.apache.ambari.infra.job.cleanup; -import org.apache.ambari.infra.job.SolrDAOBase; -import org.apache.solr.client.solrj.util.ClientUtils; +import java.time.Duration; +import java.time.OffsetDateTime; + +import org.apache.ambari.infra.job.InfraJobExecutionDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; -public class DocumentWiperTasklet extends SolrDAOBase implements Tasklet { - private final String filterField; - private final String start; - private final String end; +public class TaskHistoryWiper implements Tasklet { + + private static final Logger logger = LoggerFactory.getLogger(TaskHistoryWiper.class); + public static final Duration DEFAULT_TTL = Duration.ofHours(1); + + private final InfraJobExecutionDao infraJobExecutionDao; + private final Duration ttl; - public DocumentWiperTasklet(DocumentDeletingProperties properties, String start, String end) { - super(properties.getZooKeeperConnectionString(), properties.getCollection()); - this.filterField = properties.getFilterField(); - this.start = start; - this.end = end; + public TaskHistoryWiper(InfraJobExecutionDao infraJobExecutionDao, Duration ttl) { + this.infraJobExecutionDao = infraJobExecutionDao; + if (ttl == null || ttl.compareTo(DEFAULT_TTL) < 0) { + logger.info("The ttl value ({}) less than the minimum required. Using the default ({}) instead", ttl, DEFAULT_TTL); + this.ttl = DEFAULT_TTL; + } + else { + this.ttl = ttl; + } } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { - delete(String.format("%s:[%s TO %s]", filterField, getValue(start), getValue(end))); + infraJobExecutionDao.deleteJobExecutions(OffsetDateTime.now().minus(ttl)); return RepeatStatus.FINISHED; } - - private String getValue(String value) { - return "*".equals(value) ? value : ClientUtils.escapeQueryChars(value); - } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java similarity index 69% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java index 63b7dd2..71d98e1 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java @@ -18,19 +18,26 @@ */ package org.apache.ambari.infra.job.deleting; -import org.apache.ambari.infra.job.JobProperties; -import org.springframework.batch.core.JobParameters; - import static org.apache.commons.lang.StringUtils.isBlank; -public class DocumentDeletingProperties extends JobProperties<DocumentDeletingProperties> { +import java.time.Duration; + +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +public class DeletingParameters implements Validatable { private String zooKeeperConnectionString; private String collection; private String filterField; - - public DocumentDeletingProperties() { - super(DocumentDeletingProperties.class); - } + private String start; + private String end; + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; public String getZooKeeperConnectionString() { return zooKeeperConnectionString; @@ -56,11 +63,28 @@ public class DocumentDeletingProperties extends JobProperties<DocumentDeletingPr this.filterField = filterField; } - @Override - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - filterField = jobParameters.getString("filterField", filterField); + public String getStart() { + return start; + } + + public void setStart(String start) { + this.start = start; + } + + public String getEnd() { + return end; + } + + public void setEnd(String end) { + this.end = end; + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; } @Override diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java index 4a68c49..f9a782c 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + +import javax.inject.Inject; + import org.apache.ambari.infra.job.AbstractJobsConfiguration; import org.apache.ambari.infra.job.JobScheduler; import org.springframework.batch.core.Job; @@ -33,10 +37,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Inject; - @Configuration -public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties> { +public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties, DeletingParameters> { private final StepBuilderFactory steps; private final Step deleteStep; @@ -70,9 +72,7 @@ public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<Doc @Bean @StepScope public DocumentWiperTasklet documentWiperTasklet( - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentDeletingProperties properties, - @Value("#{jobParameters[start]}") String start, - @Value("#{jobParameters[end]}") String end) { - return new DocumentWiperTasklet(properties, start, end); + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") DeletingParameters parameters) { + return new DocumentWiperTasklet(parameters); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java index 63b7dd2..e7ecc13 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java @@ -18,19 +18,19 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; + +import java.time.Duration; + import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; import org.springframework.batch.core.JobParameters; -import static org.apache.commons.lang.StringUtils.isBlank; - -public class DocumentDeletingProperties extends JobProperties<DocumentDeletingProperties> { +public class DocumentDeletingProperties extends JobProperties<DeletingParameters> { private String zooKeeperConnectionString; private String collection; private String filterField; - - public DocumentDeletingProperties() { - super(DocumentDeletingProperties.class); - } + private Duration ttl; public String getZooKeeperConnectionString() { return zooKeeperConnectionString; @@ -56,22 +56,23 @@ public class DocumentDeletingProperties extends JobProperties<DocumentDeletingPr this.filterField = filterField; } - @Override - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - filterField = jobParameters.getString("filterField", filterField); + public Duration getTtl() { + return ttl; } - @Override - public void validate() { - if (isBlank(zooKeeperConnectionString)) - throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); - - if (isBlank(collection)) - throw new IllegalArgumentException("The property collection can not be null or empty string!"); + public void setTtl(Duration ttl) { + this.ttl = ttl; + } - if (isBlank(filterField)) - throw new IllegalArgumentException("The property filterField can not be null or empty string!"); + @Override + public DeletingParameters merge(JobParameters jobParameters) { + DeletingParameters deletingParameters = new DeletingParameters(); + deletingParameters.setZooKeeperConnectionString(jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString)); + deletingParameters.setCollection(jobParameters.getString("collection", collection)); + deletingParameters.setFilterField(jobParameters.getString("filterField", filterField)); + deletingParameters.setStart(jobParameters.getString("start", "*")); + deletingParameters.setEnd(jobParameters.getString("end", "*")); + deletingParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return deletingParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java index 463e6e0..69d8c62 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; + import org.apache.ambari.infra.job.SolrDAOBase; import org.apache.solr.client.solrj.util.ClientUtils; import org.springframework.batch.core.StepContribution; @@ -26,20 +28,19 @@ import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; public class DocumentWiperTasklet extends SolrDAOBase implements Tasklet { - private final String filterField; - private final String start; - private final String end; + private final DeletingParameters parameters; - public DocumentWiperTasklet(DocumentDeletingProperties properties, String start, String end) { - super(properties.getZooKeeperConnectionString(), properties.getCollection()); - this.filterField = properties.getFilterField(); - this.start = start; - this.end = end; + public DocumentWiperTasklet(DeletingParameters deletingParameters) { + super(deletingParameters.getZooKeeperConnectionString(), deletingParameters.getCollection()); + parameters = deletingParameters; } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { - delete(String.format("%s:[%s TO %s]", filterField, getValue(start), getValue(end))); + delete(String.format("%s:[%s TO %s]", + parameters.getFilterField(), + getValue(parameters.getStart()), + getValue(computeEnd(parameters.getEnd(), parameters.getTtl())))); return RepeatStatus.FINISHED; } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java similarity index 58% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java index af81b4f..0946dff 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java @@ -16,34 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra.json; -public class SchedulingProperties { - private boolean enabled = false; - private String cron; - private String intervalEndDelta; +import java.time.Duration; - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public String getCron() { - return cron; - } - - public void setCron(String cron) { - this.cron = cron; - } +import com.fasterxml.jackson.databind.util.StdConverter; - public String getIntervalEndDelta() { - return intervalEndDelta; +public class DurationToStringConverter extends StdConverter<Duration, String> { + @Override + public String convert(Duration value) { + return toString(value); } - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; + public static String toString(Duration value) { + return value == null ? null : value.toString(); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java similarity index 58% copy from ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java copy to ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java index af81b4f..2a385cf 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java @@ -16,34 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job; +package org.apache.ambari.infra.json; -public class SchedulingProperties { - private boolean enabled = false; - private String cron; - private String intervalEndDelta; +import java.time.Duration; - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public String getCron() { - return cron; - } - - public void setCron(String cron) { - this.cron = cron; - } +import com.fasterxml.jackson.databind.util.StdConverter; - public String getIntervalEndDelta() { - return intervalEndDelta; +public class StringToDurationConverter extends StdConverter<String, Duration> { + @Override + public Duration convert(String value) { + return toDuration(value); } - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; + public static Duration toDuration(String value) { + return value == null ? null : Duration.parse(value); } } diff --git a/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra-manager/src/main/resources/infra-manager.properties index a0712ba..6830b81 100644 --- a/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -28,15 +28,16 @@ infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logt infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}] infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=100 infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=150 infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX -infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=true +infra-manager.jobs.solr_data_archiving.archive_service_logs.ttl=PT24H +infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=false infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.cron=0 * * * * ? -infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.intervalEndDelta=PT24H infra-manager.jobs.solr_data_archiving.archive_audit_logs.enabled=true infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs @@ -72,3 +73,6 @@ infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime +infra-manager.jobs.clean-up.ttl=PT24H +infra-manager.jobs.clean-up.scheduling.enabled=true +infra-manager.jobs.clean-up.scheduling.cron=0 * * * * ? diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java new file mode 100644 index 0000000..6d07ecd --- /dev/null +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.env; + +import javax.sql.DataSource; + +import org.springframework.batch.admin.service.JdbcSearchableJobExecutionDao; +import org.springframework.batch.admin.service.JdbcSearchableJobInstanceDao; +import org.springframework.batch.admin.service.SearchableJobExecutionDao; +import org.springframework.batch.admin.service.SearchableJobInstanceDao; +import org.springframework.batch.core.repository.ExecutionContextSerializer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; +import org.sqlite.SQLiteConfig; + +@Configuration +@ComponentScan(basePackages = {"org.apache.ambari.infra.env"}) +public class TestAppConfig { + + @Value("classpath:org/springframework/batch/core/schema-drop-sqlite.sql") + private Resource dropRepositoryTables; + + @Value("classpath:org/springframework/batch/core/schema-sqlite.sql") + private Resource dataRepositorySchema; + + @Bean + public DataSource dataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName("org.sqlite.JDBC"); + dataSource.setUrl("jdbc:sqlite:test.db"); + dataSource.setUsername("test"); + dataSource.setPassword("test"); + SQLiteConfig config = new SQLiteConfig(); + config.enforceForeignKeys(true); + dataSource.setConnectionProperties(config.toProperties()); + return dataSource; + } + + @Bean + public DataSourceInitializer dataSourceInitializer() { + ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator(); + databasePopulator.addScript(dropRepositoryTables); + databasePopulator.setIgnoreFailedDrops(true); + databasePopulator.addScript(dataRepositorySchema); + databasePopulator.setContinueOnError(true); + + DataSourceInitializer initializer = new DataSourceInitializer(); + initializer.setDataSource(dataSource()); + initializer.setDatabasePopulator(databasePopulator); + + return initializer; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public SearchableJobInstanceDao searchableJobInstanceDao(JdbcTemplate jdbcTemplate) { + JdbcSearchableJobInstanceDao dao = new JdbcSearchableJobInstanceDao(); + dao.setJdbcTemplate(jdbcTemplate); + return dao; + } + + @Bean + public SearchableJobExecutionDao searchableJobExecutionDao(JdbcTemplate jdbcTemplate, DataSource dataSource) { + JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(); + dao.setJdbcTemplate(jdbcTemplate); + dao.setDataSource(dataSource); + return dao; + } + + @Bean + public ExecutionContextSerializer executionContextSerializer() { + return new Jackson2ExecutionContextStringSerializer(); + } + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + return new TransactionTemplate(transactionManager); + } + + @Bean + public JobRepository jobRepository(ExecutionContextSerializer executionContextSerializer, PlatformTransactionManager transactionManager) throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(dataSource()); + factory.setTransactionManager(transactionManager); + factory.setSerializer(executionContextSerializer); + factory.afterPropertiesSet(); + return factory.getObject(); + } + +} diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java new file mode 100644 index 0000000..7128cbb --- /dev/null +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java @@ -0,0 +1,99 @@ +package org.apache.ambari.infra.job; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +import java.time.OffsetDateTime; +import java.util.Date; + +import javax.inject.Inject; + +import org.apache.ambari.infra.env.TestAppConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.admin.service.SearchableJobExecutionDao; +import org.springframework.batch.admin.service.SearchableJobInstanceDao; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.transaction.support.TransactionTemplate; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {TestAppConfig.class}) +public class InfraJobExecutionDAOIT { + + private static int jobCounter = 0; + + @Inject + private JdbcTemplate jdbcTemplate; + @Inject + private TransactionTemplate transactionTemplate; + @Inject + private JobRepository jobRepository; + @Inject + private SearchableJobExecutionDao searchableJobExecutionDao; + @Inject + private SearchableJobInstanceDao searchableJobInstanceDao; + private InfraJobExecutionDao infraJobExecutionDao; + + @Before + public void setUp() { + infraJobExecutionDao = new InfraJobExecutionDao(jdbcTemplate, transactionTemplate); + } + + @Test + public void testDeleteJobExecutions() throws Exception { + JobExecution yesterdayJob = newJobAt(OffsetDateTime.now().minusDays(1)); + JobExecution todayJob = newJobAt(OffsetDateTime.now()); + + infraJobExecutionDao.deleteJobExecutions(OffsetDateTime.now().minusHours(1)); + + assertThat(searchableJobExecutionDao.getJobExecution(todayJob.getId()), is(not(nullValue()))); + assertThat(searchableJobExecutionDao.getJobExecution(yesterdayJob.getId()), is(nullValue())); + + assertThat(searchableJobInstanceDao.getJobInstance(todayJob.getJobId()), is(not(nullValue()))); + assertThat(searchableJobInstanceDao.getJobInstance(yesterdayJob.getJobId()), is(nullValue())); + } + + private JobExecution newJobAt(OffsetDateTime createdAt) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { + JobParameters jobParameters = new JobParametersBuilder().addString("test param", "test value").toJobParameters(); + JobExecution jobExecution = jobRepository.createJobExecution("test job" + jobCounter++ , jobParameters); + jobExecution.setCreateTime(Date.from(createdAt.toInstant())); + jobRepository.update(jobExecution); + + StepExecution stepExecution = new StepExecution("step1", jobExecution); + jobRepository.add(stepExecution); + + return jobExecution; + } +} \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java deleted file mode 100644 index 3b7caab..0000000 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.ambari.infra.job; - -import org.apache.ambari.infra.job.archive.DocumentArchivingProperties; -import org.apache.ambari.infra.job.archive.SolrProperties; -import org.junit.Test; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -public class JobPropertiesTest { - @Test - public void testDeepCopy() throws Exception { - DocumentArchivingProperties documentArchivingProperties = new DocumentArchivingProperties(); - documentArchivingProperties.setLocalDestinationDirectory("/tmp"); - documentArchivingProperties.setFileNameSuffixColumn(".json"); - documentArchivingProperties.setReadBlockSize(10); - documentArchivingProperties.setWriteBlockSize(20); - SolrProperties solr = new SolrProperties(); - solr.setZooKeeperConnectionString("localhost:2181"); - solr.setFilterQueryText("id:1167"); - solr.setQueryText("name:'Joe'"); - solr.setCollection("Users"); - solr.setSortColumn(new String[] {"name"}); - documentArchivingProperties.setSolr(solr); - - DocumentArchivingProperties parsed = documentArchivingProperties.deepCopy(); - - assertThat(parsed.getLocalDestinationDirectory(), is(documentArchivingProperties.getLocalDestinationDirectory())); - assertThat(parsed.getFileNameSuffixColumn(), is(documentArchivingProperties.getFileNameSuffixColumn())); - assertThat(parsed.getReadBlockSize(), is(documentArchivingProperties.getReadBlockSize())); - assertThat(parsed.getWriteBlockSize(), is(documentArchivingProperties.getWriteBlockSize())); - assertThat(parsed.getSolr().getZooKeeperConnectionString(), is(documentArchivingProperties.getSolr().getZooKeeperConnectionString())); - assertThat(parsed.getSolr().getQueryText(), is(solr.getQueryText())); - assertThat(parsed.getSolr().getFilterQueryText(), is(solr.getFilterQueryText())); - assertThat(parsed.getSolr().getCollection(), is(solr.getCollection())); - assertThat(parsed.getSolr().getSortColumn(), is(solr.getSortColumn())); - } -} \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java index be8a226..b7bda57 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java @@ -1,12 +1,12 @@ package org.apache.ambari.infra.job.archive; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + import org.junit.Test; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,7 +27,7 @@ import static org.junit.Assert.assertThat; */ public class SolrPropertiesTest { @Test - public void testApplySortColumns() throws Exception { + public void testApplySortColumns() { JobParameters jobParameters = new JobParametersBuilder() .addString("sortColumn[0]", "logtime") .addString("sortColumn[1]", "id") @@ -35,20 +35,20 @@ public class SolrPropertiesTest { SolrProperties solrProperties = new SolrProperties(); solrProperties.setSortColumn(new String[] {"testColumn"}); - solrProperties.apply(jobParameters); - assertThat(solrProperties.getSortColumn().length, is(2)); - assertThat(solrProperties.getSortColumn()[0], is("logtime")); - assertThat(solrProperties.getSortColumn()[1], is("id")); + SolrParameters solrParameters = solrProperties.merge(jobParameters); + assertThat(solrParameters.getSortColumn().length, is(2)); + assertThat(solrParameters.getSortColumn()[0], is("logtime")); + assertThat(solrParameters.getSortColumn()[1], is("id")); } @Test - public void testApplyWhenNoSortIsDefined() throws Exception { + public void testApplyWhenNoSortIsDefined() { JobParameters jobParameters = new JobParametersBuilder() .toJobParameters(); SolrProperties solrProperties = new SolrProperties(); solrProperties.setSortColumn(new String[] {"testColumn"}); - solrProperties.apply(jobParameters); - assertThat(solrProperties.getSortColumn().length, is(1)); + SolrParameters solrParameters = solrProperties.merge(jobParameters); + assertThat(solrParameters.getSortColumn().length, is(0)); } } \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java index ee08279..0f7049b 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java @@ -18,15 +18,20 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.solr.client.solrj.SolrQuery; -import org.junit.Test; - -import java.util.HashMap; - +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; + +import org.apache.solr.client.solrj.SolrQuery; +import org.hamcrest.core.Is; +import org.junit.Test; + public class SolrQueryBuilderTest { private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("logtime", "2017-10-02'T'10:00:11.634Z"); @@ -103,4 +108,27 @@ public class SolrQueryBuilderTest { SolrQuery solrQuery = new SolrQueryBuilder().setQueryText("id:[${start} TO ${end}]").build(); assertThat(solrQuery.getQuery(), is("id:[* TO *]")); } + + @Test + public void testComputeEndReturnsNullIsNoEndAndNoTTLWasGiven() { + assertThat(computeEnd(null, OffsetDateTime.now(), null), Is.is(nullValue())); + } + + @Test + public void testComputeEndReturnsEndIfOnlyEndWasGiven() { + String end = "2018-10-09T10:11:12.000Z"; + assertThat(computeEnd(end, OffsetDateTime.now(), null), Is.is(end)); + } + + @Test + public void testComputeEndReturnsNowMinusTtlIfOnlyTtlWasGiven() { + OffsetDateTime now = OffsetDateTime.of(2018, 10, 9, 10, 11, 12, 0, ZoneOffset.UTC); + assertThat(computeEnd(null, now, Duration.ofDays(5)), Is.is("2018-10-04T10:11:12.000Z")); + } + + @Test + public void testComputeEndReturnsEndIfBothWasGiven() { + String end = "2018-10-09T10:11:12.000Z"; + assertThat(computeEnd(end, OffsetDateTime.now(), Duration.ofDays(5)), Is.is(end)); + } } diff --git a/ambari-infra-solr-plugin/pom.xml b/ambari-infra-solr-plugin/pom.xml index a3619cd..9de8e72 100644 --- a/ambari-infra-solr-plugin/pom.xml +++ b/ambari-infra-solr-plugin/pom.xml @@ -44,6 +44,11 @@ <version>${solr.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <resources> diff --git a/pom.xml b/pom.xml index 9402114..ab82c4b 100644 --- a/pom.xml +++ b/pom.xml @@ -305,7 +305,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.10</version> + <version>4.12</version> </dependency> <dependency> <groupId>commons-cli</groupId> @@ -363,6 +363,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + </dependency> </dependencies> </dependencyManagement>