[ https://issues.apache.org/jira/browse/AMBARI-24792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655016#comment-16655016 ]
ASF GitHub Bot commented on AMBARI-24792: ----------------------------------------- oleewere closed pull request #8: AMBARI-24792 - Infra Manager: not all the documents archived URL: https://github.com/apache/ambari-infra/pull/8 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java index f0b592d..9c8fbcf 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java @@ -96,4 +96,13 @@ public void deleteObject(String key) { throw new RuntimeException(e); } } + + public InputStream getObject(String key) { + try { + return s3client.getObject(bucket, key); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } } diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java index 24d603b..7a748bc 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -114,7 +114,7 @@ private void checkInfraManagerReachable() throws Exception { } } - protected void addDocument(OffsetDateTime logtime) { + protected SolrInputDocument addDocument(OffsetDateTime logtime) { SolrInputDocument solrInputDocument = new SolrInputDocument(); solrInputDocument.addField("logType", "HDFSAudit"); solrInputDocument.addField("cluster", "cl1"); @@ -148,6 +148,7 @@ protected void addDocument(OffsetDateTime logtime) { solrInputDocument.addField("_ttl_", "+7DAYS"); solrInputDocument.addField("_expire_at_", "2017-12-15T10:23:19.106Z"); solr.add(solrInputDocument); + return solrInputDocument; } @AfterStories diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java index b1d36d1..e2bbe9d 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.infra.steps; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER; import static org.apache.ambari.infra.TestUtil.doWithin; @@ -26,19 +27,24 @@ import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.junit.Assert.assertThat; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.io.UncheckedIOException; import java.time.Duration; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.ambari.infra.InfraClient; import org.apache.ambari.infra.JobExecutionInfo; import org.apache.ambari.infra.S3Client; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -50,16 +56,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + public class ExportJobsSteps extends AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); + private Set<String> documentIds = new HashSet<>(); private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>(); @Given("$count documents in solr") public void addDocuments(int count) { OffsetDateTime intervalEnd = OffsetDateTime.now(); + documentIds.clear(); for (int i = 0; i < count; ++i) { - addDocument(intervalEnd.minusMinutes(i % (count / 10))); + documentIds.add(addDocument(intervalEnd.minusMinutes(i % (count / 10))).get("id").getValue().toString()); } getSolr().commit(); } @@ -68,13 +79,15 @@ public void addDocuments(int count) { public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime endLogtime) { Duration duration = Duration.between(startLogtime, endLogtime); long increment = duration.toNanos() / count; - for (int i = 0; i < count; ++i) - addDocument(startLogtime.plusNanos(increment * i)); + documentIds.clear(); + for (int i = 0; i < count; ++i) { + documentIds.add(addDocument(startLogtime.plusNanos(increment * i)).get("id").getValue().toString()); + } getSolr().commit(); } @Given("a file on s3 with key $key") - public void addFileToS3(String key) throws Exception { + public void addFileToS3(String key) { getS3client().putObject(key, "anything".getBytes()); } @@ -204,4 +217,26 @@ public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String .filter(file -> file.getName().contains(text)) .count(), is(count)); } + + private static final ObjectMapper json = new ObjectMapper(); + + @Then("Check the files $fileNamePart contains the archived documents") + public void checkStoredDocumentIds(String fileNamePart) throws Exception { + S3Client s3Client = getS3client(); + int size = documentIds.size(); + Set<String> storedDocumentIds = new HashSet<>(); + for (String objectKey : s3Client.listObjectKeys(fileNamePart)) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new BZip2CompressorInputStream(s3Client.getObject(objectKey)), UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + Map<String, Object> document = json.readValue(line, new TypeReference<HashMap<String, Object>>() {}); + String id = document.get("id").toString(); + storedDocumentIds.add(id); + documentIds.remove(id); + } + } + } + assertThat(documentIds.size(), is(0)); + assertThat(storedDocumentIds.size(), is(size)); + } } diff --git a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story index 2330474..806dc84 100644 --- a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story +++ b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -11,6 +11,7 @@ Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10 When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z after 2 seconds Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds +And Check the files solr_archive_audit_logs_-_2010-10-09 contains the archived documents Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents @@ -32,6 +33,7 @@ When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.jso And restart archive_audit_logs job within 2 seconds Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds +And Check the files solr_archive_audit_logs_-_2011-10-09 contains the archived documents Scenario: After Deleting job deletes documents from solr no document found in the specified interval @@ -65,3 +67,4 @@ And stop job archive_audit_logs after at least 1 file exists in s3 with filename Then Less than 20 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds When restart archive_audit_logs job within 10 seconds Then Check 25 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +And Check the files solr_archive_audit_logs_-_2014-03-09 contains the archived documents \ No newline at end of file diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java index 1cb2d62..43e871f 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java @@ -94,7 +94,12 @@ public SolrParameters merge(JobParameters jobParameters) { sortColumns.add(sortValue); ++i; } - solrParameters.setSortColumn(sortColumns.toArray(new String[0])); + if (!sortColumns.isEmpty()) { + solrParameters.setSortColumn(sortColumns.toArray(new String[0])); + } + else { + solrParameters.setSortColumn(sortColumn); + } return solrParameters; } diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java index b7bda57..71d25b6 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java @@ -1,6 +1,7 @@ package org.apache.ambari.infra.job.archive; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import org.junit.Test; @@ -27,7 +28,7 @@ */ public class SolrPropertiesTest { @Test - public void testApplySortColumns() { + public void testMergeSortColumns() { JobParameters jobParameters = new JobParametersBuilder() .addString("sortColumn[0]", "logtime") .addString("sortColumn[1]", "id") @@ -42,13 +43,24 @@ public void testApplySortColumns() { } @Test - public void testApplyWhenNoSortIsDefined() { + public void testMergeWhenNoSortIsDefined() { + JobParameters jobParameters = new JobParametersBuilder() + .toJobParameters(); + + SolrProperties solrProperties = new SolrProperties(); + SolrParameters solrParameters = solrProperties.merge(jobParameters); + assertThat(solrParameters.getSortColumn(), is(nullValue())); + } + + @Test + public void testMergeWhenPropertiesAreDefinedButJobParamsAreNot() { JobParameters jobParameters = new JobParametersBuilder() .toJobParameters(); SolrProperties solrProperties = new SolrProperties(); solrProperties.setSortColumn(new String[] {"testColumn"}); SolrParameters solrParameters = solrProperties.merge(jobParameters); - assertThat(solrParameters.getSortColumn().length, is(0)); + assertThat(solrParameters.getSortColumn().length, is(1)); + assertThat(solrParameters.getSortColumn()[0], is("testColumn")); } } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Infra Manager: not all the documents archived > --------------------------------------------- > > Key: AMBARI-24792 > URL: https://issues.apache.org/jira/browse/AMBARI-24792 > Project: Ambari > Issue Type: Bug > Components: ambari-infra > Affects Versions: 2.8.0 > Reporter: Krisztian Kasa > Assignee: Krisztian Kasa > Priority: Major > Labels: pull-request-available > Fix For: 2.8.0 > > > Infra manager does not use the sort_column properties when reading the > documents to archive. This leads to some of the documents are not exported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)