AMBARI-22514, AMBARI-22653. Ambari Infra Manager: solr data exporting jobs and integration test environment. (Krisztian Kasa via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36d0271f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36d0271f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36d0271f Branch: refs/heads/branch-feature-AMBARI-22008-isilon Commit: 36d0271f74a70f5cfeca0e5ca0ebeb795fab6138 Parents: a15fc7f Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Dec 21 13:24:03 2017 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Dec 21 13:24:03 2017 -0800 ---------------------------------------------------------------------- ambari-infra/ambari-infra-manager-it/pom.xml | 155 +++++++++++++ .../org/apache/ambari/infra/InfraClient.java | 93 ++++++++ .../ambari/infra/InfraManagerStories.java | 108 +++++++++ .../ambari/infra/OffsetDateTimeConverter.java | 39 ++++ .../ambari/infra/steps/AbstractInfraSteps.java | 223 +++++++++++++++++++ .../ambari/infra/steps/ExportJobsSteps.java | 106 +++++++++ .../src/test/resources/log4j.properties | 16 ++ .../resources/stories/infra_api_tests.story | 23 ++ .../ambari-infra-manager/docker/Dockerfile | 6 +- .../docker/docker-compose.yml | 81 +++++++ .../docker/infra-manager-docker-compose.sh | 105 +++++++++ .../apache/ambari/infra/job/ObjectSource.java | 23 ++ .../infra/job/archive/AbstractFileAction.java | 33 +++ .../infra/job/archive/CompositeFileAction.java | 7 +- .../ambari/infra/job/archive/Document.java | 1 - .../archive/DocumentExportConfiguration.java | 74 +++--- .../job/archive/DocumentExportJobListener.java | 23 ++ .../job/archive/DocumentExportProperties.java | 140 +++++++++--- .../job/archive/DocumentExportPropertyMap.java | 38 ++++ .../job/archive/DocumentExportStepListener.java | 47 ---- .../infra/job/archive/DocumentItemReader.java | 8 +- .../infra/job/archive/DocumentIterator.java | 5 +- .../infra/job/archive/DocumentSource.java | 7 +- .../ambari/infra/job/archive/FileAction.java | 2 +- .../job/archive/LocalDocumentItemWriter.java | 8 +- .../ambari/infra/job/archive/S3Properties.java | 57 ++--- .../ambari/infra/job/archive/S3Uploader.java | 23 +- .../infra/job/archive/SolrDocumentIterator.java | 3 +- .../infra/job/archive/SolrDocumentSource.java | 22 +- .../infra/job/archive/SolrQueryBuilder.java | 28 ++- .../infra/job/archive/SolrQueryProperties.java | 40 +++- .../infra/job/archive/TarGzCompressor.java | 2 +- .../src/main/resources/infra-manager.properties | 48 +++- .../archive/DocumentExportPropertiesTest.java | 54 +++++ .../job/archive/DocumentItemReaderTest.java | 8 +- .../archive/LocalDocumentItemWriterTest.java | 8 +- .../infra/job/archive/SolrQueryBuilderTest.java | 18 +- .../job/archive/SolrQueryPropertiesTest.java | 54 +++++ ambari-infra/pom.xml | 5 +- 39 files changed, 1532 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/pom.xml b/ambari-infra/ambari-infra-manager-it/pom.xml new file mode 100644 index 0000000..97e8ea0 --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/pom.xml @@ -0,0 +1,155 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>ambari-infra</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.0.0.0-SNAPSHOT</version> + </parent> + + <name>Ambari Infra Manager Integration Tests</name> + <url>http://maven.apache.org</url> + <modelVersion>4.0.0</modelVersion> + + <artifactId>ambari-infra-manager-it</artifactId> + + <properties> + <jbehave.version>4.0.5</jbehave.version> + <failsafe-plugin.version>2.20</failsafe-plugin.version> + <docker.host>localhost</docker.host> + <stories.location>NONE</stories.location> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + <version>${solr.version}</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.11.5</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.5</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.20</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.20</version> + </dependency> + + <dependency> + <groupId>org.jbehave</groupId> + <artifactId>jbehave-core</artifactId> + <version>${jbehave.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <testOutputDirectory>target/classes</testOutputDirectory> + <testResources> + <testResource> + <directory>src/test/java/</directory> + <includes> + <include>**/*.story</include> + </includes> + </testResource> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + </build> + + <profiles> + <profile> + <id>it</id> + <activation> + <property> + <name>it</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${failsafe-plugin.version}</version> + <executions> + <execution> + <id>run-integration-tests</id> + <phase>integration-test</phase> + <goals> + <goal>integration-test</goal> + </goals> + <configuration> + <includes> + <include>**/*Stories.java</include> + </includes> + <systemPropertyVariables> + <log4j.configuration>file:${project.build.testOutputDirectory}/log4j.properties</log4j.configuration> + <docker.host>${docker.host}</docker.host> + <backend.stories.location>${stories.location}</backend.stories.location> + </systemPropertyVariables> + </configuration> + </execution> + <execution> + <id>verify-integration-tests</id> + <phase>verify</phase> + <goals> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java new file mode 100644 index 0000000..0e391a3 --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; +import org.apache.http.impl.client.HttpClientBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; + +import static org.apache.commons.lang.StringUtils.isBlank; + +// TODO: use swagger +public class InfraClient implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(InfraClient.class); + + private final CloseableHttpClient httpClient; + private final URI baseUrl; + + public InfraClient(String baseUrl) { + try { + this.baseUrl = new URI(baseUrl); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + httpClient = HttpClientBuilder.create().setRetryHandler(new DefaultHttpRequestRetryHandler(0, false)).build(); + } + + @Override + public void close() throws Exception { + httpClient.close(); + } + + // TODO: return job data + public void getJobs() { + execute(new HttpGet(baseUrl)); + } + + private String execute(HttpRequestBase post) { + try (CloseableHttpResponse response = httpClient.execute(post)) { + String responseBodyText = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset()); + LOG.info("Response code {} body {} ", response.getStatusLine().getStatusCode(), responseBodyText); + return responseBodyText; + } catch (ClientProtocolException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // TODO: return job data + public void startJob(String jobName, String parameters) { + URIBuilder uriBuilder = new URIBuilder(baseUrl); + uriBuilder.setScheme("http"); + uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName); + if (!isBlank(parameters)) + uriBuilder.addParameter("params", parameters); + try { + execute(new HttpPost(uriBuilder.build())); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java new file mode 100644 index 0000000..cf720ef --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraManagerStories.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra; + +import com.google.common.collect.Lists; +import org.apache.ambari.infra.steps.ExportJobsSteps; +import org.apache.commons.lang.StringUtils; +import org.jbehave.core.configuration.Configuration; +import org.jbehave.core.configuration.MostUsefulConfiguration; +import org.jbehave.core.io.LoadFromClasspath; +import org.jbehave.core.io.LoadFromRelativeFile; +import org.jbehave.core.io.StoryFinder; +import org.jbehave.core.io.StoryLoader; +import org.jbehave.core.junit.JUnitStories; +import org.jbehave.core.reporters.Format; +import org.jbehave.core.reporters.StoryReporterBuilder; +import org.jbehave.core.steps.InjectableStepsFactory; +import org.jbehave.core.steps.InstanceStepsFactory; +import org.jbehave.core.steps.ParameterConverters; + +import java.io.File; +import java.net.URL; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.jbehave.core.io.CodeLocations.codeLocationFromClass; + +public class InfraManagerStories extends JUnitStories { + private static final String BACKEND_STORIES_LOCATION_PROPERTY = "backend.stories.location"; + private static final String STORY_SUFFIX = ".story"; + + @Override + public Configuration configuration() { + return new MostUsefulConfiguration() + .useStoryLoader(getStoryLoader(BACKEND_STORIES_LOCATION_PROPERTY, this.getClass())) + .useParameterConverters(new ParameterConverters().addConverters(new OffsetDateTimeConverter())) + .useStoryReporterBuilder( + new StoryReporterBuilder().withFailureTrace(true).withDefaultFormats().withFormats(Format.CONSOLE, Format.TXT)); + } + + private static StoryLoader getStoryLoader(String property, Class clazz) { + boolean useExternalStoryLocation = useExternalStoryLocation(property); + if (useExternalStoryLocation) { + try { + return new LoadFromRelativeFile(new URL("file://" + System.getProperty(property))); + } catch (Exception e) { + throw new RuntimeException("Cannot load story files from url: file://" + System.getProperty(property)); + } + } else { + return new LoadFromClasspath(clazz); + } + } + + @Override + public InjectableStepsFactory stepsFactory() { + return new InstanceStepsFactory(configuration(), new ExportJobsSteps()); + } + + @Override + protected List<String> storyPaths() { + return findStories(BACKEND_STORIES_LOCATION_PROPERTY, STORY_SUFFIX, this.getClass()); + } + + private static List<String> findStories(String property, String suffix, Class clazz) { + if (useExternalStoryLocation(property)) { + return findStoriesInFolder(System.getProperty(property), suffix); + } else { + return new StoryFinder() + .findPaths(codeLocationFromClass(clazz).getFile(), singletonList(String.format("**/*%s", suffix)), null); + } + } + + private static List<String> findStoriesInFolder(String folderAbsolutePath, String suffix) { + List<String> results = Lists.newArrayList(); + File folder = new File(folderAbsolutePath); + File[] listOfFiles = folder.listFiles(); + if (listOfFiles != null) { + for (File file : listOfFiles) { + if (file.getName().endsWith(suffix)) { + results.add(file.getName()); + } + } + } + return results; + } + + private static boolean useExternalStoryLocation(String property) { + String storyLocationProp = System.getProperty(property); + return StringUtils.isNotEmpty(storyLocationProp) && !"NONE".equals(storyLocationProp); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java new file mode 100644 index 0000000..9db562c --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/OffsetDateTimeConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra; + +import org.jbehave.core.steps.ParameterConverters; + +import java.lang.reflect.Type; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; + +public class OffsetDateTimeConverter implements ParameterConverters.ParameterConverter { + private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + + @Override + public boolean accept(Type type) { + return type instanceof Class<?> && OffsetDateTime.class.isAssignableFrom((Class<?>) type); + } + + @Override + public Object convertValue(String value, Type type) { + return OffsetDateTime.parse(value, SOLR_DATETIME_FORMATTER); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java new file mode 100644 index 0000000..703e1cf --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.steps; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import org.apache.ambari.infra.InfraClient; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.apache.solr.common.SolrInputDocument; +import org.jbehave.core.annotations.AfterStories; +import org.jbehave.core.annotations.BeforeStories; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.Date; +import java.util.UUID; +import java.util.function.BooleanSupplier; + +import static java.lang.System.currentTimeMillis; + +public abstract class AbstractInfraSteps { + private static final Logger LOG = LoggerFactory.getLogger(AbstractInfraSteps.class); + + private static final int SOLR_PORT = 8983; + private static final int INFRA_MANAGER_PORT = 61890; + private static final int FAKE_S3_PORT = 4569; + private static final String AUDIT_LOGS_COLLECTION = "audit_logs"; + protected static final String S3_BUCKET_NAME = "testbucket"; + private String ambariFolder; + private String shellScriptLocation; + private String dockerHost; + private SolrClient solrClient; + private AmazonS3Client s3client; + + public InfraClient getInfraClient() { + return new InfraClient(String.format("http://%s:%d/api/v1/jobs", dockerHost, INFRA_MANAGER_PORT)); + } + + public SolrClient getSolrClient() { + return solrClient; + } + + public AmazonS3Client getS3client() { + return s3client; + } + + @BeforeStories + public void initDockerContainer() throws Exception { + LOG.info("Create new docker container for testing Ambari Infra Manager ..."); + URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation(); + ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent(); + shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh"; + + runCommand(new String[]{shellScriptLocation, "start"}); + + dockerHost = System.getProperty("docker.host") != null ? System.getProperty("docker.host") : "localhost"; + + waitUntilSolrIsUp(); + + solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica1", + dockerHost, + SOLR_PORT, + AUDIT_LOGS_COLLECTION)).build(); + + LOG.info("Creating collection"); + runCommand(new String[]{"docker", "exec", "docker_solr_1", "solr", "create_collection", "-c", AUDIT_LOGS_COLLECTION, "-d", "configsets/"+ AUDIT_LOGS_COLLECTION +"/conf", "-n", AUDIT_LOGS_COLLECTION + "_conf"}); + + LOG.info("Initializing s3 client"); + s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential")); + s3client.setEndpoint(String.format("http://%s:%d", dockerHost, FAKE_S3_PORT)); + s3client.createBucket(S3_BUCKET_NAME); + + checkInfraManagerReachable(); + } + + protected void runCommand(String[] command) { + try { + LOG.info("Exec command: {}", StringUtils.join(command, " ")); + Process process = Runtime.getRuntime().exec(command); + String stdout = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8); + LOG.info("Exec command result {}", stdout); + } catch (Exception e) { + throw new RuntimeException("Error during execute shell command: ", e); + } + } + + private void waitUntilSolrIsUp() throws Exception { + try(CloseableHttpClient httpClient = HttpClientBuilder.create().setRetryHandler(new DefaultHttpRequestRetryHandler(0, false)).build()) { + doWithin(60, "Start Solr", () -> pingSolr(httpClient)); + } + } + + protected void doWithin(int sec, String actionName, BooleanSupplier predicate) { + doWithin(sec, actionName, () -> { + if (!predicate.getAsBoolean()) + throw new RuntimeException("Predicate was false!"); + }); + } + + protected void doWithin(int sec, String actionName, Runnable runnable) { + long start = currentTimeMillis(); + Exception exception; + while (true) { + try { + runnable.run(); + return; + } + catch (Exception e) { + exception = e; + } + + if (currentTimeMillis() - start > sec * 1000) { + throw new AssertionError(String.format("Unable to perform action '%s' within %d seconds", actionName, sec), exception); + } + else { + LOG.info("Performing action '{}' failed. retrying...", actionName); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + private boolean pingSolr(CloseableHttpClient httpClient) { + try (CloseableHttpResponse response = httpClient.execute(new HttpGet(String.format("http://%s:%d/solr/admin/collections?action=LIST", dockerHost, SOLR_PORT)))) { + return response.getStatusLine().getStatusCode() == 200; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void checkInfraManagerReachable() throws Exception { + try (InfraClient httpClient = getInfraClient()) { + doWithin(30, "Start Ambari Infra Manager", httpClient::getJobs); + LOG.info("Ambari Infra Manager is up and running"); + } + } + + protected void addDocument(OffsetDateTime logtime) throws SolrServerException, IOException { + SolrInputDocument solrInputDocument = new SolrInputDocument(); + solrInputDocument.addField("logType", "HDFSAudit"); + solrInputDocument.addField("cluster", "cl1"); + solrInputDocument.addField("event_count", 1); + solrInputDocument.addField("repo", "hdfs"); + solrInputDocument.addField("reqUser", "ambari-qa"); + solrInputDocument.addField("type", "hdfs_audit"); + solrInputDocument.addField("seq_num", 9); + solrInputDocument.addField("result", 1); + solrInputDocument.addField("path", "/root/test-logs/hdfs-audit/hdfs-audit.log"); + solrInputDocument.addField("ugi", "ambari-qa (auth:SIMPLE)"); + solrInputDocument.addField("host", "logfeeder.apache.org"); + solrInputDocument.addField("action", "getfileinfo"); + solrInputDocument.addField("log_message", "allowed=true\tugi=ambari-qa (auth:SIMPLE)\tip=/192.168.64.102\tcmd=getfileinfo\tsrc=/ats/active\tdst=null\tperm=null\tproto=rpc\tcallerContext=HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f"); + solrInputDocument.addField("logger_name", "FSNamesystem.audit"); + solrInputDocument.addField("id", UUID.randomUUID().toString()); + solrInputDocument.addField("authType", "SIMPLE"); + solrInputDocument.addField("logfile_line_number", 1); + solrInputDocument.addField("cliIP", "/192.168.64.102"); + solrInputDocument.addField("level", "INFO"); + solrInputDocument.addField("resource", "/ats/active"); + solrInputDocument.addField("ip", "172.18.0.2"); + solrInputDocument.addField("evtTime", "2017-12-08T10:23:16.452Z"); + solrInputDocument.addField("req_caller_id", "HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f"); + solrInputDocument.addField("repoType", 1); + solrInputDocument.addField("enforcer", "hadoop-acl"); + solrInputDocument.addField("cliType", "rpc"); + solrInputDocument.addField("message_md5", "-6778765776916226588"); + solrInputDocument.addField("event_md5", "5627261521757462732"); + solrInputDocument.addField("logtime", new Date(logtime.toInstant().toEpochMilli())); + solrInputDocument.addField("_ttl_", "+7DAYS"); + solrInputDocument.addField("_expire_at_", "2017-12-15T10:23:19.106Z"); + solrClient.add(solrInputDocument); + } + + @AfterStories + public void shutdownContainers() throws Exception { + Thread.sleep(2000); // sync with s3 server + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); + ObjectListing objectListing = getS3client().listObjects(listObjectsRequest); + LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size()); + objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file in s3 with key {}", s3ObjectSummary.getKey())); + + LOG.info("shutdown containers"); + runCommand(new String[]{shellScriptLocation, "stop"}); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java new file mode 100644 index 0000000..4a09d7d --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.steps; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.ambari.infra.InfraClient; +import org.jbehave.core.annotations.Alias; +import org.jbehave.core.annotations.Given; +import org.jbehave.core.annotations.Then; +import org.jbehave.core.annotations.When; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.time.Duration; +import java.time.OffsetDateTime; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; + +public class ExportJobsSteps extends AbstractInfraSteps { + private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); + + @Given("$count documents in solr") + public void addDocuments(int count) throws Exception { + for (int i = 0; i < count; ++i) + addDocument(OffsetDateTime.now().minusMinutes(i)); + getSolrClient().commit(); + } + + @Given("$count documents in solr with logtime from $startLogtime to $endLogtime") + public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime endLogtime) throws Exception { + Duration duration = Duration.between(startLogtime, endLogtime); + long increment = duration.toNanos() / count; + for (int i = 0; i < count; ++i) + addDocument(startLogtime.plusNanos(increment * i)); + getSolrClient().commit(); + } + + @Given("a file on s3 with key $key") + public void addFileToS3(String key) throws Exception { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) { + getS3client().putObject(S3_BUCKET_NAME, key, inputStream, new ObjectMetadata()); + } + } + + @When("start $jobName job") + public void startJob(String jobName) throws Exception { + startJob(jobName, null); + } + + @When("start $jobName job with parameters $parameters") + @Alias("restart $jobName job with parameters $parameters") + public void startJob(String jobName, String parameters) throws Exception { + try (InfraClient httpClient = getInfraClient()) { + httpClient.startJob(jobName, parameters); + } + } + + @When("delete file with key $key from s3") + public void deleteFileFromS3(String key) { + getS3client().deleteObject(S3_BUCKET_NAME, key); + } + + @Then("Check filenames contains the text $text on s3 server after $waitSec seconds") + public void checkS3After(String text, int waitSec) throws Exception { + AmazonS3Client s3Client = getS3client(); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) + && !s3Client.listObjects(listObjectsRequest).getObjectSummaries().isEmpty()); + + ObjectListing objectListing = s3Client.listObjects(listObjectsRequest); + assertThat(objectListing.getObjectSummaries(), hasItem(hasProperty("key", containsString(text)))); + } + + @Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") + public void checkNumberOfFilesOnS3(int count, String text, int waitSec) { + AmazonS3Client s3Client = getS3client(); + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) + && s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() + .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)) + .count() == count); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties b/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties new file mode 100644 index 0000000..956bc63 --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/log4j.properties @@ -0,0 +1,16 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story new file mode 100644 index 0000000..cd1f49d --- /dev/null +++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -0,0 +1,23 @@ +Scenario: Export documents form solr and upload them to s3 using defult configuration + +Given 1000 documents in solr +When start export_audit_logs job +Then Check filenames contains the text audit_logs on s3 server after 20 seconds + + +Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files + +Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z +When start export_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z +Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds + + +Scenario: Export job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data. + +Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z +And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz +When start export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z +Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds +When delete file with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz from s3 +And restart export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z +Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/docker/Dockerfile b/ambari-infra/ambari-infra-manager/docker/Dockerfile index adb584a..eaefe95 100644 --- a/ambari-infra/ambari-infra-manager/docker/Dockerfile +++ b/ambari-infra/ambari-infra-manager/docker/Dockerfile @@ -22,9 +22,9 @@ RUN yum -y install glibc-common ENV HOME /root #Install JAVA -ENV JAVA_VERSION 8u31 -ENV BUILD_VERSION b13 -RUN wget --no-cookies --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/jdk-$JAVA_VERSION-linux-x64.rpm" -O jdk-8-linux-x64.rpm +ENV JAVA_VERSION 8u131 +ENV BUILD_VERSION b11 +RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/d54c1d3a095b4ff2b6607d096fa80163/jdk-$JAVA_VERSION-linux-x64.rpm -O jdk-8-linux-x64.rpm RUN rpm -ivh jdk-8-linux-x64.rpm ENV JAVA_HOME /usr/java/default/ http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/docker-compose.yml ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/docker/docker-compose.yml b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml new file mode 100644 index 0000000..1172631 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +version: '3.3' +services: + zookeeper: + image: zookeeper:${ZOOKEEPER_VERSION:-3.4.10} + restart: always + hostname: zookeeper + networks: + - infra-network + ports: + - 2181:2181 + environment: + ZOO_MY_ID: 1 + ZOO_SERVERS: server.1=zookeeper:2888:3888 + solr: + image: solr:${SOLR_VERSION:-6.6.2} + restart: always + hostname: solr + ports: + - "8983:8983" + networks: + - infra-network + env_file: + - Profile + entrypoint: + - docker-entrypoint.sh + - solr + - start + - "-f" + - "-c" + - "-z" + - ${ZOOKEEPER_CONNECTION_STRING} + volumes: + - $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets + localstack-s3: + image: localstack/localstack + ports: + - "4569:4569" + environment: + - SERVICES=s3:4569 + hostname: fakes3 + networks: + infra-network: + aliases: + - testbucket.fakes3 + env_file: + - Profile + inframanager: + image: ambari-infra-manager:v1.0 + restart: always + hostname: infra-manager.apache.org + networks: + - infra-network + env_file: + - Profile + ports: + - 61890:61890 + - 5007:5007 + environment: + COMPONENT: infra-manager + COMPONENT_LOG: infra-manager + ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING} + DISPLAY: $DOCKERIP:0 + volumes: + - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager +networks: + infra-network: + driver: bridge http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh new file mode 100644 index 0000000..ab02659 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh @@ -0,0 +1,105 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +sdir="`dirname \"$0\"`" +: ${1:?"argument is missing: (start|stop)"} +command="$1" + +function start_containers() { + check_env_files + echo "Start containers ..." + pushd $sdir/../ + local AMBARI_INFRA_MANAGER_LOCATION=$(pwd) + echo $AMBARI_INFRA_MANAGER_LOCATION + kill_containers + cd $AMBARI_INFRA_MANAGER_LOCATION/docker + docker-compose up -d + popd + echo "Containers started" +} + +function check_env_files() { + local count=0; + + check_env_file .env setup_env + count=$((count + $?)); + check_env_file Profile setup_profile + count=$((count + $?)); + + if [[ "$count" -gt 0 ]] + then + echo "Exit" + exit; + fi +} + +function check_env_file() { + if [ -f "$sdir/$1" ]; + then + echo "$1 file exists" + return 0; + else + echo "$1 file does not exist, Creating a new one..." + $2 + echo "$1 file has been created. Check it out before starting Ambari Infra Manager. ($sdir/$1)" + return 1; + fi +} + +function setup_env() { + pushd $sdir/../../ + local AMBARI_LOCATION=$(pwd) + popd + local docker_ip=$(get_docker_ip) + cat << EOF > $sdir/.env +DOCKERIP=$docker_ip +MAVEN_REPOSITORY_LOCATION=$HOME/.m2 +AMBARI_LOCATION=$AMBARI_LOCATION + +ZOOKEEPER_VERSION=3.4.10 +ZOOKEEPER_CONNECTION_STRING=zookeeper:2181 + +SOLR_VERSION=6.6.2 +EOF +} + +function setup_profile() { + pushd $sdir/../../ + local AMBARI_LOCATION=$(pwd) + popd + cat << EOF > $sdir/Profile +EOF +} + +function kill_containers() { + echo "Try to remove containers if exists ..." + docker rm -f docker_inframanager_1 + docker rm -f docker_solr_1 + docker rm -f docker_zookeeper_1 + docker rm -f docker_localstack-s3_1 +} + +case $command in + "start") + start_containers + ;; + "stop") + kill_containers + ;; + *) + echo "Available commands: (start|stop|build-and-run|build|build-docker-and-run|build-mvn-and-run|build-docker-only|build-mvn-only)" + ;; +esac http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java new file mode 100644 index 0000000..98a1e0d --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/ObjectSource.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +public interface ObjectSource<T> { + CloseableIterator<T> open(T current, int rows); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java new file mode 100644 index 0000000..7a30393 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/AbstractFileAction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import java.io.File; + +public abstract class AbstractFileAction implements FileAction { + @Override + public File perform(File inputFile, boolean deleteInput) { + File outputFile = perform(inputFile); + if (deleteInput) + inputFile.delete(); + return outputFile; + } + + protected abstract File perform(File inputFile); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java index 84ce160..8421802 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/CompositeFileAction.java @@ -19,6 +19,7 @@ package org.apache.ambari.infra.job.archive; import java.io.File; +import java.util.ArrayList; import java.util.List; import static java.util.Arrays.asList; @@ -28,7 +29,7 @@ public class CompositeFileAction implements FileAction { private final List<FileAction> actions; public CompositeFileAction(FileAction... actions) { - this.actions = asList(actions); + this.actions = new ArrayList<>(asList(actions)); } public void add(FileAction action) { @@ -36,10 +37,10 @@ public class CompositeFileAction implements FileAction { } @Override - public File perform(File inputFile) { + public File perform(File inputFile, boolean deleteInput) { File file = inputFile; for (FileAction action : actions) { - file = action.perform(file); + file = action.perform(file, deleteInput); } return file; } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java index 84f5ece..1f3957a 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/Document.java @@ -26,7 +26,6 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; -// TODO: create entities for each solr collections public class Document { private final Map<String, String> fieldMap; http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java index 69f41d3..1895911 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportConfiguration.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.job.ObjectSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; @@ -26,28 +27,23 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory import org.springframework.batch.core.configuration.annotation.JobScope; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.annotation.PostConstruct; import javax.inject.Inject; import java.io.File; import java.nio.file.Paths; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; - -import static org.apache.ambari.infra.job.archive.SolrDocumentSource.SOLR_DATETIME_FORMATTER; -import static org.apache.commons.lang.StringUtils.isBlank; @Configuration public class DocumentExportConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class); - private static final DateTimeFormatter FILENAME_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH_mm_ss.SSSX"); @Inject - private DocumentExportProperties properties; + private DocumentExportPropertyMap propertyMap; @Inject private StepBuilderFactory steps; @@ -55,11 +51,26 @@ public class DocumentExportConfiguration { @Inject private JobBuilderFactory jobs; + @Inject + @Qualifier("exportStep") + private Step exportStep; + + @Inject + private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; - @Bean - public Job logExportJob(@Qualifier("exportStep") Step logExportStep) { - return jobs.get("solr_data_export").listener(new DocumentExportJobListener()).start(logExportStep).build(); + @PostConstruct + public void createJobs() { + propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate); + + propertyMap.getSolrDataExport().keySet().forEach(jobName -> { + Job job = logExportJob(jobName, exportStep); + jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName); + }); + } + + private Job logExportJob(String jobName, Step logExportStep) { + return jobs.get(jobName).listener(new DocumentExportJobListener(propertyMap)).start(logExportStep).build(); } @Bean @@ -67,16 +78,17 @@ public class DocumentExportConfiguration { public Step exportStep(DocumentExporter documentExporter) { return steps.get("export") .tasklet(documentExporter) - .listener(new DocumentExportStepListener(properties)) .build(); } @Bean @StepScope - public DocumentExporter getDocumentExporter(DocumentItemReader documentItemReader, - @Value("#{stepExecution.jobExecution.id}") String jobId) { + public DocumentExporter documentExporter(DocumentItemReader documentItemReader, + @Value("#{stepExecution.jobExecution.id}") String jobId, + @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) { File path = Paths.get( properties.getDestinationDirectoryPath(), + // TODO: jobId should remain the same after continuing job String.format("%s_%s", properties.getQuery().getCollection(), jobId)).toFile(); // TODO: add end date LOG.info("Destination directory path={}", path); if (!path.exists()) { @@ -86,33 +98,43 @@ public class DocumentExportConfiguration { } CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); + properties.s3Properties().ifPresent(s3Properties -> fileAction.add(new S3Uploader(s3Properties))); return new DocumentExporter( documentItemReader, - firstDocument -> new LocalDocumentItemWriter( - new File(path, String.format("%s_-_%s.json", - properties.getQuery().getCollection(), - firstDocument.get(properties.getFileNameSuffixColumn()))), - fileAction), + firstDocument -> localDocumentItemWriter(properties, path, fileAction, firstDocument), properties.getWriteBlockSize()); } + private LocalDocumentItemWriter localDocumentItemWriter(DocumentExportProperties properties, File path, FileAction fileAction, Document firstDocument) { + return new LocalDocumentItemWriter(outFile(properties.getQuery().getCollection(), path, firstDocument.get(properties.getFileNameSuffixColumn())), + file -> fileAction.perform(file, true)); + } + + private File outFile(String collection, File directoryPath, String suffix) { + // TODO: format date (suffix) + File file = new File(directoryPath, String.format("%s_-_%s.json", collection, suffix)); + LOG.info("Exporting to temp file {}", file.getAbsolutePath()); + return file; + } + @Bean @StepScope - public DocumentItemReader reader(DocumentSource documentSource) { + public DocumentItemReader reader(ObjectSource<Document> documentSource, + @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) { return new DocumentItemReader(documentSource, properties.getReadBlockSize()); } @Bean @StepScope - public DocumentSource logSource(@Value("#{jobParameters[endDate]}") String endDateText) { - OffsetDateTime endDate = OffsetDateTime.now(ZoneOffset.UTC); - if (!isBlank(endDateText)) - endDate = OffsetDateTime.parse(endDateText); + public ObjectSource logSource(@Value("#{jobParameters[start]}") String start, + @Value("#{jobParameters[end]}") String end, + @Value("#{stepExecution.jobExecution.executionContext.get('exportProperties')}") DocumentExportProperties properties) { return new SolrDocumentSource( - properties.getZooKeeperSocket(), + properties.getZooKeeperConnectionString(), properties.getQuery(), - SOLR_DATETIME_FORMATTER.format(endDate)); + start, + end); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java index f1df46c..3b6c402 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportJobListener.java @@ -23,9 +23,32 @@ import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; public class DocumentExportJobListener implements JobExecutionListener { + + private final DocumentExportPropertyMap propertyMap; + + public DocumentExportJobListener(DocumentExportPropertyMap propertyMap) { + this.propertyMap = propertyMap; + } + + @Override public void beforeJob(JobExecution jobExecution) { + try { + String jobName = jobExecution.getJobInstance().getJobName(); + DocumentExportProperties defaultProperties = propertyMap.getSolrDataExport().get(jobName); + if (defaultProperties == null) + throw new UnsupportedOperationException("Properties not found for job " + jobName); + DocumentExportProperties properties = defaultProperties.deepCopy(); + properties.apply(jobExecution.getJobParameters()); + properties.validate(); + jobExecution.getExecutionContext().put("exportProperties", properties); + } + catch (UnsupportedOperationException | IllegalArgumentException ex) { + jobExecution.stop(); + jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage())); + throw ex; + } } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java index d6301c0..37f6d1b 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java @@ -18,38 +18,34 @@ */ package org.apache.ambari.infra.job.archive; -import org.hibernate.validator.constraints.NotBlank; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; import org.springframework.batch.core.JobParameters; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.PropertySource; -import javax.validation.constraints.Min; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Optional; import static org.apache.commons.lang.StringUtils.isBlank; -@Configuration -@PropertySource(value = {"classpath:infra-manager.properties"}) -@ConfigurationProperties(prefix = "infra-manager.jobs.solr_data_export") public class DocumentExportProperties { - @NotBlank - private String zooKeeperSocket; - @Min(1) + private String zooKeeperConnectionString; private int readBlockSize; - @Min(1) private int writeBlockSize; - @NotBlank private String destinationDirectoryPath; - @NotBlank private String fileNameSuffixColumn; private SolrQueryProperties query; - - public String getZooKeeperSocket() { - return zooKeeperSocket; + private String s3AccessKey; + private String s3SecretKey; + private String s3KeyPrefix; + private String s3BucketName; + private String s3Endpoint; + + public String getZooKeeperConnectionString() { + return zooKeeperConnectionString; } - public void setZooKeeperSocket(String zooKeeperSocket) { - this.zooKeeperSocket = zooKeeperSocket; + public void setZooKeeperConnectionString(String zooKeeperConnectionString) { + this.zooKeeperConnectionString = zooKeeperConnectionString; } public int getReadBlockSize() { @@ -76,37 +72,109 @@ public class DocumentExportProperties { this.destinationDirectoryPath = destinationDirectoryPath; } + public String getFileNameSuffixColumn() { + return fileNameSuffixColumn; + } + + public void setFileNameSuffixColumn(String fileNameSuffixColumn) { + this.fileNameSuffixColumn = fileNameSuffixColumn; + } + + public SolrQueryProperties getQuery() { + return query; + } + + public void setQuery(SolrQueryProperties query) { + this.query = query; + } + + public String getS3AccessKey() { + return s3AccessKey; + } + + public void setS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + } + + public String getS3SecretKey() { + return s3SecretKey; + } + + public void setS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + } + + public String getS3KeyPrefix() { + return s3KeyPrefix; + } + + public void setS3KeyPrefix(String s3KeyPrefix) { + this.s3KeyPrefix = s3KeyPrefix; + } + + public String getS3BucketName() { + return s3BucketName; + } + + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public String getS3Endpoint() { + return s3Endpoint; + } + + public void setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + } + public void apply(JobParameters jobParameters) { - // TODO: solr query params - zooKeeperSocket = jobParameters.getString("zooKeeperSocket", zooKeeperSocket); + zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath); - query.setCollection(jobParameters.getString("collection", query.getCollection())); - query.setQueryText(jobParameters.getString("queryText", query.getQueryText())); - query.setFilterQueryText(jobParameters.getString("filterQueryText", query.getFilterQueryText())); + query.apply(jobParameters); } private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) { - String writeBlockSizeText = jobParameters.getString(parameterName); - if (isBlank(writeBlockSizeText)) + String valueText = jobParameters.getString(parameterName); + if (isBlank(valueText)) return defaultValue; - return this.writeBlockSize = Integer.parseInt(writeBlockSizeText); + return Integer.parseInt(valueText); } - public String getFileNameSuffixColumn() { - return fileNameSuffixColumn; + public DocumentExportProperties deepCopy() { + try { + ObjectMapper objectMapper = new ObjectMapper(); + String json = objectMapper.writeValueAsString(this); + return objectMapper.readValue(json, DocumentExportProperties.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - public void setFileNameSuffixColumn(String fileNameSuffixColumn) { - this.fileNameSuffixColumn = fileNameSuffixColumn; + public Optional<S3Properties> s3Properties() { + if (!isBlank(s3AccessKey) && !isBlank(s3SecretKey) && !isBlank(s3BucketName)) + return Optional.of(new S3Properties(s3AccessKey, s3SecretKey, s3KeyPrefix, s3BucketName, s3Endpoint)); + return Optional.empty(); } - public SolrQueryProperties getQuery() { - return query; - } + public void validate() { + if (isBlank(zooKeeperConnectionString)) + throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); - public void setQuery(SolrQueryProperties query) { - this.query = query; + if (readBlockSize == 0) + throw new IllegalArgumentException("The property readBlockSize must be greater than 0!"); + + if (writeBlockSize == 0) + throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); + + if (isBlank(destinationDirectoryPath)) + throw new IllegalArgumentException("The property destinationDirectoryPath can not be null or empty string!"); + + if (isBlank(fileNameSuffixColumn)) + throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!"); + + query.validate(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java new file mode 100644 index 0000000..9af4afc --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportPropertyMap.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +@Configuration +@ConfigurationProperties(prefix = "infra-manager.jobs") +public class DocumentExportPropertyMap { + private Map<String, DocumentExportProperties> solrDataExport; + + public Map<String, DocumentExportProperties> getSolrDataExport() { + return solrDataExport; + } + + public void setSolrDataExport(Map<String, DocumentExportProperties> solrDataExport) { + this.solrDataExport = solrDataExport; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java deleted file mode 100644 index 3bab6d5..0000000 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportStepListener.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.infra.job.archive; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.batch.core.ExitStatus; -import org.springframework.batch.core.StepExecution; -import org.springframework.batch.core.StepExecutionListener; - -public class DocumentExportStepListener implements StepExecutionListener { - private static final Logger LOG = LoggerFactory.getLogger(DocumentExportStepListener.class); - - private final DocumentExportProperties properties; - - public DocumentExportStepListener(DocumentExportProperties properties) { - this.properties = properties; - } - - @Override - public void beforeStep(StepExecution stepExecution) { - properties.apply(stepExecution.getJobParameters()); - LOG.info("LogExport step - before step execution"); - } - - @Override - public ExitStatus afterStep(StepExecution stepExecution) { - LOG.info("LogExport step - after step execution"); - return stepExecution.getExitStatus(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java index a4378a4..3a6b869 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentItemReader.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.job.CloseableIterator; +import org.apache.ambari.infra.job.ObjectSource; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.support.AbstractItemStreamItemReader; @@ -31,16 +33,16 @@ public class DocumentItemReader extends AbstractItemStreamItemReader<Document> i public final static String POSITION = "last-read"; - private final DocumentSource documentSource; + private final ObjectSource<Document> documentSource; private final int readBlockSize; - private DocumentIterator documentIterator = null; + private CloseableIterator<Document> documentIterator = null; private int count = 0; private boolean eof = false; private Document current = null; private Document previous = null; - public DocumentItemReader(DocumentSource documentSource, int readBlockSize) { + public DocumentItemReader(ObjectSource<Document> documentSource, int readBlockSize) { this.documentSource = documentSource; this.readBlockSize = readBlockSize; setName(ClassUtils.getShortName(DocumentItemReader.class)); http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java index 6232cfc..5fa29b0 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentIterator.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.ambari.infra.job.archive; +package org.apache.ambari.infra.job; import java.util.Iterator; -// TODO: generic closeable iterator -public interface DocumentIterator extends Iterator<Document>, AutoCloseable { +public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java index c9871a3..7427771 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentSource.java @@ -18,7 +18,8 @@ */ package org.apache.ambari.infra.job.archive; -// TODO: generic object source -public interface DocumentSource { - DocumentIterator open(Document current, int rows); +import java.io.File; + +public interface ItemWriterListener { + void onCompleted(File file); } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java index 26a8c63..d3f2a65 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileAction.java @@ -21,5 +21,5 @@ package org.apache.ambari.infra.job.archive; import java.io.File; public interface FileAction { - File perform(File inputFile); + File perform(File inputFile, boolean deleteInput); } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java index 02d898d..baad61b 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/LocalDocumentItemWriter.java @@ -29,10 +29,10 @@ public class LocalDocumentItemWriter implements DocumentItemWriter { private final File outFile; private final BufferedWriter bufferedWriter; - private final FileAction fileAction; + private final ItemWriterListener itemWriterListener; - public LocalDocumentItemWriter(File outFile, FileAction fileAction) { - this.fileAction = fileAction; + public LocalDocumentItemWriter(File outFile, ItemWriterListener itemWriterListener) { + this.itemWriterListener = itemWriterListener; this.outFile = outFile; try { this.bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile), ENCODING)); @@ -64,7 +64,7 @@ public class LocalDocumentItemWriter implements DocumentItemWriter { public void close() { try { bufferedWriter.close(); - fileAction.perform(outFile); + itemWriterListener.onCompleted(outFile); } catch (IOException e) { throw new UncheckedIOException(e); } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java index 495401d..0979f10 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Properties.java @@ -18,47 +18,48 @@ */ package org.apache.ambari.infra.job.archive; -import org.hibernate.validator.constraints.NotBlank; - public class S3Properties { - @NotBlank - private String accessKey; - @NotBlank - private String secretKey; - @NotBlank - private String keyPrefix; - @NotBlank - private String bucketName; - - public String getAccessKey() { - return accessKey; - } + private String s3AccessKey; + private String s3SecretKey; + private String s3KeyPrefix; + private String s3BucketName; + private String s3EndPoint; - public String getSecretKey() { - return secretKey; + public S3Properties(String s3AccessKey, String s3SecretKey, String s3KeyPrefix, String s3BucketName, String s3EndPoint) { + this.s3AccessKey = s3AccessKey; + this.s3SecretKey = s3SecretKey; + this.s3KeyPrefix = s3KeyPrefix; + this.s3BucketName = s3BucketName; + this.s3EndPoint = s3EndPoint; } - public String getKeyPrefix() { - return keyPrefix; + public String getS3AccessKey() { + return s3AccessKey; } - public String getBucketName() { - return bucketName; + public String getS3SecretKey() { + return s3SecretKey; } - public void setAccessKey(String accessKey) { - this.accessKey = accessKey; + public String getS3KeyPrefix() { + return s3KeyPrefix; } - public void setSecretKey(String secretKey) { - this.secretKey = secretKey; + public String getS3BucketName() { + return s3BucketName; } - public void setKeyPrefix(String keyPrefix) { - this.keyPrefix = keyPrefix; + public String getS3EndPoint() { + return s3EndPoint; } - public void setBucketName(String bucketName) { - this.bucketName = bucketName; + @Override + public String toString() { + return "S3Properties{" + + "s3AccessKey='" + s3AccessKey + '\'' + + ", s3KeyPrefix='" + s3KeyPrefix + '\'' + + ", s3BucketName='" + s3BucketName + '\'' + + ", s3EndPoint='" + s3EndPoint + '\'' + + '}'; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java index 3214e50..deeb9c7 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java @@ -2,9 +2,13 @@ package org.apache.ambari.infra.job.archive; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.s3.AmazonS3Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import static org.apache.commons.lang.StringUtils.isBlank; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -23,17 +27,25 @@ import java.io.File; * specific language governing permissions and limitations * under the License. */ -public class S3Uploader implements FileAction { +public class S3Uploader extends AbstractFileAction { + + private static final Logger LOG = LoggerFactory.getLogger(DocumentExportConfiguration.class); private final AmazonS3Client client; private final String keyPrefix; private final String bucketName; public S3Uploader(S3Properties s3Properties) { - this.keyPrefix = s3Properties.getKeyPrefix(); - this.bucketName = s3Properties.getBucketName(); - BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getAccessKey(), s3Properties.getSecretKey()); + LOG.info("Initializing S3 client with " + s3Properties); + + this.keyPrefix = s3Properties.getS3KeyPrefix(); + this.bucketName = s3Properties.getS3BucketName(); + BasicAWSCredentials credentials = new BasicAWSCredentials(s3Properties.getS3AccessKey(), s3Properties.getS3SecretKey()); client = new AmazonS3Client(credentials); + if (!isBlank(s3Properties.getS3EndPoint())) + client.setEndpoint(s3Properties.getS3EndPoint()); +// Note: without pathStyleAccess=true endpoint going to be <bucketName>.<host>:<port> +// client.setS3ClientOptions(S3ClientOptions.builder().setPathStyleAccess(true).build()); } @Override @@ -41,8 +53,7 @@ public class S3Uploader implements FileAction { String key = keyPrefix + inputFile.getName(); if (client.doesObjectExist(bucketName, key)) { - System.out.println("Object '" + key + "' already exists"); - System.exit(0); + throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName)); } client.putObject(bucketName, key, inputFile); http://git-wip-us.apache.org/repos/asf/ambari/blob/36d0271f/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java ---------------------------------------------------------------------- diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java index db4069b..2e7341d 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDocumentIterator.java @@ -18,6 +18,7 @@ */ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.job.CloseableIterator; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; @@ -31,7 +32,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.TimeZone; -public class SolrDocumentIterator implements DocumentIterator { +public class SolrDocumentIterator implements CloseableIterator<Document> { private static final DateFormat SOLR_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");