Repository: metron Updated Branches: refs/heads/master 9ce4ba5a9 -> 20eaed239
METRON-1552: Add gzip file validation check to the geo loader (mmiklavc via mmiklavc) closes apache/metron#1011 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/20eaed23 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/20eaed23 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/20eaed23 Branch: refs/heads/master Commit: 20eaed239b2552d0823d34f571b63d941c352bc9 Parents: 9ce4ba5 Author: mmiklavc <michael.miklav...@gmail.com> Authored: Tue May 15 11:12:07 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Tue May 15 11:12:07 2018 -0600 ---------------------------------------------------------------------- .../common/utils/CompressionStrategies.java | 100 +++++++++++++++++++ .../common/utils/CompressionStrategy.java | 52 ++++++++++ .../common/utils/CompressionUtilsTest.java | 62 ++++++++++++ .../nonbulk/geo/GeoEnrichmentLoader.java | 70 +++++++++---- .../nonbulk/geo/GeoEnrichmentLoaderTest.java | 40 ++++++-- 5 files changed, 298 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java new file mode 100644 index 0000000..f9c53c8 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategies.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipException; + +/* + * Factory to provide various compression strategies. + */ +public enum CompressionStrategies implements CompressionStrategy { + + GZIP(new CompressionStrategy() { + @Override + public void compress(File inFile, File outFile) throws IOException { + try (FileInputStream fis = new FileInputStream(inFile); + FileOutputStream fos = new FileOutputStream(outFile); + GZIPOutputStream gzipOS = new GZIPOutputStream(fos)) { + byte[] buffer = new byte[1024]; + int len; + while ((len = fis.read(buffer)) != -1) { + gzipOS.write(buffer, 0, len); + } + } + } + + @Override + public void decompress(File inFile, File outFile) throws IOException { + try (FileInputStream fis = new FileInputStream(inFile); + GZIPInputStream gis = new GZIPInputStream(fis); + FileOutputStream fos = new FileOutputStream(outFile)) { + byte[] buffer = new byte[1024]; + int len; + while ((len = gis.read(buffer)) != -1) { + fos.write(buffer, 0, len); + } + } + + } + + @Override + public boolean test(File gzipFile) { + try (FileInputStream fis = new FileInputStream(gzipFile); + GZIPInputStream gis = new GZIPInputStream(fis)) { + byte[] buffer = new byte[1024]; + // this will throw an exception on malformed file + gis.read(buffer); + } catch (ZipException | EOFException e) { + return false; + } catch (IOException e) { + throw new IllegalStateException("Error occurred while attempting to validate gzip file", e); + } + return true; + } + }); + + private CompressionStrategy strategy; + + CompressionStrategies(CompressionStrategy strategy) { + this.strategy = strategy; + } + + @Override + public void compress(File inFile, File outFile) throws IOException { + strategy.compress(inFile, outFile); + } + + @Override + public void decompress(File inFile, File outFile) throws IOException { + strategy.decompress(inFile, outFile); + } + + @Override + public boolean test(File gzipFile) { + return strategy.test(gzipFile); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java new file mode 100644 index 0000000..6a98c95 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/CompressionStrategy.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import java.io.File; +import java.io.IOException; + +public interface CompressionStrategy { + + /** + * Compress infile. + * + * @param inFile file to compress + * @param outFile destination path for compressed output + * @throws IOException Any IO error + */ + void compress(File inFile, File outFile) throws IOException; + + /** + * Decompress infile. + * + * @param inFile file to decompress + * @param outFile destination path for decompressed output + * @throws IOException Any IO error + */ + void decompress(File inFile, File outFile) throws IOException; + + /** + * Test if file is proper gzip format. True if valid, false otherwise. + * + * @param gzipFile file to check for gzip compression + * @return true if file is a gzip format, false otherwise. + */ + boolean test(File gzipFile); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java new file mode 100644 index 0000000..50d83e9 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/CompressionUtilsTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.io.IOException; +import org.apache.metron.integration.utils.TestUtils; +import org.junit.Before; +import org.junit.Test; + +public class CompressionUtilsTest { + + private static final String SAMPLE_TEXT = "hello world"; + private File tempDir; + private File textFile; + + @Before + public void setup() throws IOException { + tempDir = TestUtils.createTempDir(this.getClass().getName()); + textFile = new File(tempDir, "test-text-file.txt"); + TestUtils.write(textFile, SAMPLE_TEXT); + } + + @Test + public void compresses_Gzip() throws IOException { + File gzipFile = new File(tempDir, "test-gz-compression-file.gz"); + CompressionStrategies.GZIP.compress(textFile, gzipFile); + assertThat(CompressionStrategies.GZIP.test(gzipFile), equalTo(true)); + } + + @Test + public void decompresses_Gzip() throws IOException { + File gzipFile = new File(tempDir, "test-gz-decompress.gz"); + CompressionStrategies.GZIP.compress(textFile, gzipFile); + assertThat("gzipped file should exist", gzipFile.exists(), equalTo(true)); + File unzippedText = new File(tempDir, "test-gz-decompressed.txt"); + CompressionStrategies.GZIP.decompress(gzipFile, unzippedText); + assertThat("decompressed file should exist", unzippedText.exists(), equalTo(true)); + String actual = TestUtils.read(unzippedText); + assertThat("decompressed text should match", actual, equalTo(SAMPLE_TEXT)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java index b366015..fc89b89 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoader.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.utils.CompressionStrategies; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; @@ -42,6 +43,9 @@ import java.time.ZoneOffset; import java.util.Map; public class GeoEnrichmentLoader { + + private static final String DEFAULT_RETRIES = "2"; + private static abstract class OptionHandler implements Function<String, Option> { } @@ -70,6 +74,15 @@ public class GeoEnrichmentLoader { o.setRequired(false); return o; } + }), RETRIES("re", new GeoEnrichmentLoader.OptionHandler() { + @Nullable + @Override + public Option apply(@Nullable String s) { + Option o = new Option(s, "retries", true, "Number of geo db download retries, after an initial failure."); + o.setArgName("RETRIES"); + o.setRequired(false); + return o; + } }), TMP_DIR("t", new GeoEnrichmentLoader.OptionHandler() { @Nullable @Override @@ -146,7 +159,14 @@ public class GeoEnrichmentLoader { System.out.println("Retrieving GeoLite2 archive"); String url = GeoEnrichmentOptions.GEO_URL.get(cli, "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz"); String tmpDir = GeoEnrichmentOptions.TMP_DIR.get(cli, "/tmp") + "/"; // Make sure there's a file separator at the end - File localGeoFile = downloadGeoFile(url, tmpDir); + int numRetries = Integer.parseInt(GeoEnrichmentOptions.RETRIES.get(cli, DEFAULT_RETRIES)); + File localGeoFile = null; + try { + localGeoFile = downloadGeoFile(url, tmpDir, numRetries); + } catch (IllegalStateException ies) { + System.err.println("Failed to download geo db file. Aborting"); + System.exit(5); + } // Want to delete the tar in event of failure localGeoFile.deleteOnExit(); System.out.println("GeoIP files downloaded successfully"); @@ -167,26 +187,40 @@ public class GeoEnrichmentLoader { System.out.println("Successfully created and updated new GeoIP information"); } - protected File downloadGeoFile(String urlStr, String tmpDir) { + protected File downloadGeoFile(String urlStr, String tmpDir, int numRetries) { File localFile = null; - try { - URL url = new URL(urlStr); - localFile = new File(tmpDir + new File(url.getPath()).getName()); + int attempts = 0; + boolean valid = false; + while (!valid && attempts <= numRetries) { + try { + URL url = new URL(urlStr); + localFile = new File(tmpDir + new File(url.getPath()).getName()); - System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath()); - if (localFile.exists() && !localFile.delete()) { - System.err.println("File already exists locally and can't be deleted. Please delete before continuing"); - System.exit(3); + System.out.println("Downloading " + url.toString() + " to " + localFile.getAbsolutePath()); + if (localFile.exists() && !localFile.delete()) { + System.err.println( + "File already exists locally and can't be deleted. Please delete before continuing"); + System.exit(3); + } + FileUtils.copyURLToFile(url, localFile, 5000, 10000); + if (!CompressionStrategies.GZIP.test(localFile)) { + throw new IOException("Invalid Gzip file"); + } else { + valid = true; + } + } catch (MalformedURLException e) { + System.err.println("Malformed URL - aborting: " + e); + e.printStackTrace(); + System.exit(4); + } catch (IOException e) { + System.err.println("Warning: Unable to copy remote GeoIP database to local file, attempt " + attempts + ": " + e); + e.printStackTrace(); } - FileUtils.copyURLToFile(url, localFile, 5000, 10000); - } catch (MalformedURLException e) { - System.err.println("Malformed URL - aborting: " + e); - e.printStackTrace(); - System.exit(4); - } catch (IOException e) { - System.err.println("Unable to copy remote GeoIP database to local file: " + e); - e.printStackTrace(); - System.exit(5); + attempts++; + } + if (!valid) { + System.err.println("Unable to copy remote GeoIP database to local file after " + attempts + " attempts"); + throw new IllegalStateException("Unable to download geo enrichment database."); } return localFile; } http://git-wip-us.apache.org/repos/asf/metron/blob/20eaed23/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java index 3fa0270..2babeee 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/geo/GeoEnrichmentLoaderTest.java @@ -17,19 +17,24 @@ */ package org.apache.metron.dataloads.nonbulk.geo; +import static org.junit.Assert.assertTrue; + +import java.io.File; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.PosixParser; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.GenericOptionsParser; -import org.junit.*; +import org.apache.metron.common.utils.CompressionStrategies; +import org.apache.metron.integration.utils.TestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; -import java.io.File; - -import static org.junit.Assert.assertTrue; - public class GeoEnrichmentLoaderTest { private class MockGeoEnrichmentLoader extends GeoEnrichmentLoader { @Override @@ -43,7 +48,7 @@ public class GeoEnrichmentLoaderTest { private File remoteDir; private File tmpDir; - @Before + @Before public void setup() throws Exception { testFolder.create(); remoteDir = testFolder.newFolder("remoteDir"); @@ -76,8 +81,10 @@ public class GeoEnrichmentLoaderTest { @Test public void testLoadGeoIpDatabase() throws Exception { - File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb"); - dbFile.createNewFile(); + File dbPlainTextFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb"); + TestUtils.write(dbPlainTextFile, "hello world"); + File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb.gz"); + CompressionStrategies.GZIP.compress(dbPlainTextFile, dbFile); String[] argv = {"--geo_url", "file://" + dbFile.getAbsolutePath(), "--remote_dir", remoteDir.getAbsolutePath(), "--tmp_dir", tmpDir.getAbsolutePath(), "--zk_quorum", "test:2181"}; String[] otherArgs = new GenericOptionsParser(argv).getRemainingArgs(); CommandLine cli = GeoEnrichmentLoader.GeoEnrichmentOptions.parse(new PosixParser(), otherArgs); @@ -88,4 +95,21 @@ public class GeoEnrichmentLoaderTest { FileSystem fs = FileSystem.get(config); assertTrue(fs.exists(new Path(remoteDir + "/" + dbFile.getName()))); } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void loader_throws_exception_on_bad_gzip_file() throws Exception { + File dbFile = new File(remoteDir.getAbsolutePath() + "/GeoEnrichmentLoaderTest.mmdb"); + dbFile.createNewFile(); + + String geoUrl = "file://" + dbFile.getAbsolutePath(); + int numRetries = 2; + exception.expect(IllegalStateException.class); + exception.expectMessage("Unable to download geo enrichment database."); + GeoEnrichmentLoader loader = new MockGeoEnrichmentLoader(); + loader.downloadGeoFile(geoUrl, tmpDir.getAbsolutePath(), numRetries); + } + }