This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new d1912ae STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running new f015130 Merge pull request #2990 from srdo/STORM-3372 d1912ae is described below commit d1912ae98afe9f470a05e57835c41f056cebb311 Author: Stig Rohde Døssing <s...@apache.org> AuthorDate: Thu Apr 4 15:58:55 2019 +0200 STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running --- external/storm-hdfs-blobstore/pom.xml | 5 + .../apache/storm/hdfs/blobstore/BlobStoreTest.java | 266 ++++++++++----------- external/storm-hdfs/pom.xml | 2 +- .../apache/storm/hdfs/bolt/AbstractHdfsBolt.java | 4 +- .../org/apache/storm/hdfs/bolt/TestHdfsBolt.java | 12 + .../hdfs/bolt/format/TestSimpleFileNameFormat.java | 2 +- .../apache/storm/hdfs/spout/TestHdfsSemantics.java | 5 +- .../org/apache/storm/hdfs/spout/TestHdfsSpout.java | 25 +- .../hdfs/testing/MiniDFSClusterExtension.java | 64 +++++ .../storm/hdfs/testing/MiniDFSClusterRule.java | 5 + .../storm-hdfs/src/test/resources/log4j.properties | 5 +- external/storm-hdfs/src/test/resources/log4j2.xml | 32 +++ pom.xml | 2 +- 13 files changed, 268 insertions(+), 161 deletions(-) diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml index e2b948b..ccc69a6 100644 --- a/external/storm-hdfs-blobstore/pom.xml +++ b/external/storm-hdfs-blobstore/pom.xml @@ -208,6 +208,11 @@ <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java index a125793..53cca75 100644 --- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java +++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java @@ -18,6 +18,7 @@ */ package org.apache.storm.hdfs.blobstore; +import org.apache.storm.hdfs.testing.MiniDFSClusterExtension; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.blobstore.AtomicOutputStream; @@ -28,14 +29,9 @@ import org.apache.storm.generated.AccessControlType; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.SettableBlobMeta; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.apache.storm.security.auth.FixedGroupsMapping; import org.apache.storm.security.auth.NimbusPrincipal; import org.apache.storm.security.auth.SingleUserPrincipal; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,30 +51,34 @@ import java.util.UUID; import static org.junit.Assert.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + public class BlobStoreTest { - @ClassRule - public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); + @RegisterExtension + public static final MiniDFSClusterExtension DFS_CLUSTER_EXTENSION = new MiniDFSClusterExtension(); private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); URI base; - File baseFile; private static final Map<String, Object> CONF = new HashMap<>(); public static final int READ = 0x01; public static final int WRITE = 0x02; public static final int ADMIN = 0x04; - @Before + @BeforeEach public void init() { initializeConfigs(); - baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID()); - base = baseFile.toURI(); } - @After + @AfterEach public void cleanup() throws IOException { - FileUtils.deleteDirectory(baseFile); } // Method which initializes nimbus admin @@ -160,7 +160,7 @@ public class BlobStoreTest { conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3); HdfsBlobStore store = new HdfsBlobStore(); - store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0)); + store.prepareInternal(conf, null, DFS_CLUSTER_EXTENSION.getDfscluster().getConfiguration(0)); return new AutoCloseableBlobStoreContainer(store); } @@ -204,15 +204,6 @@ public class BlobStoreTest { } } - @Test - public void testHdfsWithAuth() - throws Exception { - // use different blobstore dir so it doesn't conflict with other tests - try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) { - testWithAuthentication(container.blobStore); - } - } - // Test for replication. public void testReplication(String path, BlobStore store) throws Exception { @@ -289,133 +280,130 @@ public class BlobStoreTest { store.deleteBlob("test", getSubject(createSubject)); } - public Subject getSubject(String name) { + public static Subject getSubject(String name) { Subject subject = new Subject(); SingleUserPrincipal user = new SingleUserPrincipal(name); subject.getPrincipals().add(user); return subject; } - - // Check for Blobstore with authentication - public void testWithAuthentication(BlobStore store) - throws Exception { - //Test for Nimbus Admin - Subject admin = getSubject("admin"); - assertStoreHasExactly(store); - SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) { - assertStoreHasExactly(store, "test"); - out.write(1); + + static enum AuthenticationTestSubject { + //Nimbus Admin + ADMIN(getSubject("admin")), + //Nimbus groups admin + ADMIN_GROUPS_USER(getSubject("adminGroupsUser")), + //Supervisor admin + SUPERVISOR(getSubject("supervisor")), + //Nimbus itself + NIMBUS(getNimbusSubject()); + + private Subject subject; + + private AuthenticationTestSubject(Subject subject) { + this.subject = subject; } - store.deleteBlob("test", admin); - - //Test for Nimbus Groups Admin - Subject adminsGroupsUser = getSubject("adminsGroupsUser"); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) { - assertStoreHasExactly(store, "test"); - out.write(1); + } + + @ParameterizedTest + @EnumSource(value = AuthenticationTestSubject.class) + void testWithAuthentication(AuthenticationTestSubject testSubject) throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-" + testSubject.name())) { + BlobStore store = container.blobStore; + assertStoreHasExactly(store); + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, testSubject.subject)) { + assertStoreHasExactly(store, "test"); + out.write(1); + } + store.deleteBlob("test", testSubject.subject); } - store.deleteBlob("test", adminsGroupsUser); - - //Test for Supervisor Admin - Subject supervisor = getSubject("supervisor"); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) { + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWithAuthenticationDummy(boolean securityEnabled) throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-dummy-sec-" + securityEnabled)) { + BlobStore store = container.blobStore; + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } assertStoreHasExactly(store, "test"); - out.write(1); + if (securityEnabled) { + // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because + // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have + // complete access to the blob + assertTrue("ACL contains WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + } else { + // Testing whether acls are set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + } + + readAssertEqualsWithAuth(store, who, "test", 1); + + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); } - store.deleteBlob("test", supervisor); - - //Test for Nimbus itself as a user - Subject nimbus = getNimbusSubject(); - assertStoreHasExactly(store); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) { + } + + @Test + void testWithAuthenticationUpdate() throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-update")) { + BlobStore store = container.blobStore; + Subject who = getSubject("test_subject"); + assertStoreHasExactly(store); + + SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } assertStoreHasExactly(store, "test"); - out.write(1); - } - store.deleteBlob("test", nimbus); - - // Test with a dummy test_subject for cases where subject !=null (security turned on) - Subject who = getSubject("test_subject"); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls for the blob are set to WORLD_EVERYTHING - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { - out.write(1); - } - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 1); - - LOG.info("Deleting test"); - store.deleteBlob("test", who); - assertStoreHasExactly(store); - - // Tests for case when subject != null (security turned on) and - // acls are not set for the blob (DEFAULT) - LOG.info("Creating test again"); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { - out.write(2); - } - assertStoreHasExactly(store, "test"); - // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because - // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have - // complete access to the blob - assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test", 2); - - LOG.info("Updating test"); - try (AtomicOutputStream out = store.updateBlob("test", who)) { - out.write(3); - } - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); - - LOG.info("Updating test again"); - try (AtomicOutputStream out = store.updateBlob("test", who)) { - out.write(4); - } - LOG.info("SLEEPING"); - Thread.sleep(2); - assertStoreHasExactly(store, "test"); - readAssertEqualsWithAuth(store, who, "test", 3); + readAssertEqualsWithAuth(store, who, "test", 1); + + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(2); + } + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 2); + + try (AtomicOutputStream out = store.updateBlob("test", who)) { + out.write(3); + } + assertStoreHasExactly(store, "test"); + readAssertEqualsWithAuth(store, who, "test", 3); - //Test for subject with no principals and acls set to WORLD_EVERYTHING - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING); - LOG.info("Creating test"); - try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) { - out.write(2); + LOG.info("Deleting test"); + store.deleteBlob("test", who); + assertStoreHasExactly(store); } - assertStoreHasExactly(store, "test-empty-subject-WE", "test"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2); - - //Test for subject with no principals and acls set to DEFAULT - who = new Subject(); - metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); - LOG.info("Creating other"); - try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) { - out.write(2); - } - assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE"); - // Testing whether acls are set to WORLD_EVERYTHING - assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); - readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2); - - if (store instanceof HdfsBlobStore) { - ((HdfsBlobStore) store).fullCleanup(1); - } else { - fail("Error the blobstore is of unknowntype"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWithAuthenticationNoPrincipal(boolean securityEnabled) throws Exception { + try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-no-principal-sec-" + securityEnabled)) { + BlobStore store = container.blobStore; + //Test for subject with no principals + Subject who = new Subject(); + assertStoreHasExactly(store); + + // Tests for case when subject != null (security turned on) and + // acls for the blob are set to WORLD_EVERYTHING + SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING); + try (AtomicOutputStream out = store.createBlob("test", metadata, who)) { + out.write(1); + } + assertStoreHasExactly(store, "test"); + // With no principals in the subject ACL should always be set to WORLD_EVERYTHING + assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)")); + + readAssertEqualsWithAuth(store, who, "test", 1); } } @@ -535,6 +523,6 @@ public class BlobStoreTest { fail("Error the blobstore is of unknowntype"); } assertStoreHasExactly(store, "test"); - readAssertEquals(store, "test", 3); + readAssertEquals(store, "test", 4); } } diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index d0db240..a19d821 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -234,7 +234,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <configuration> + <configuration> <reuseForks>false</reuseForks> <forkCount>1</forkCount> </configuration> diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index dfcf30f..a145274 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -247,7 +247,9 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { @Override public void cleanup() { doRotationAndRemoveAllWriters(); - this.rotationTimer.cancel(); + if (this.rotationTimer != null) { + this.rotationTimer.cancel(); + } } private void doRotationAndRemoveAllWriters() { diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index 32844e7..7f63cc0 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -56,6 +56,7 @@ import org.mockito.runners.MockitoJUnitRunner; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; + @RunWith(MockitoJUnitRunner.class) public class TestHdfsBolt { @@ -203,6 +204,17 @@ public class TestHdfsBolt { //Tick should have flushed it Assert.assertEquals(1, countNonZeroLengthFiles(testRoot)); } + + @Test + public void testCleanupDoesNotThrowExceptionWhenRotationPolicyIsNotTimed() { + //STORM-3372: Rotation policy other than TimedRotationPolicy causes NPE on cleanup + FileRotationPolicy fieldsRotationPolicy = + new FileSizeRotationPolicy(10_000, FileSizeRotationPolicy.Units.MB); + HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f) + .withRotationPolicy(fieldsRotationPolicy); + bolt.prepare(new Config(), topologyContext, collector); + bolt.cleanup(); + } public void createBaseDirectory(FileSystem passedFs, String path) throws IOException { Path p = new Path(path); diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java index a12ae5d..f8e1e5e 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java @@ -69,7 +69,7 @@ public class TestSimpleFileNameFormat { } private TopologyContext createTopologyContext(Map<String, Object> topoConf) { - Map<Integer, String> taskToComponent = new HashMap<Integer, String>(); + Map<Integer, String> taskToComponent = new HashMap<>(); taskToComponent.put(7, "Xcom"); return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null, null); diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java index 1a278c1..3528a3d 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java @@ -12,6 +12,8 @@ package org.apache.storm.hdfs.spout; +import static org.hamcrest.core.IsNull.notNullValue; + import java.io.IOException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; @@ -30,7 +32,6 @@ import org.junit.Test; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.notNull; public class TestHdfsSemantics { @@ -124,7 +125,7 @@ public class TestHdfsSemantics { //2 try to append to a closed file try (FSDataOutputStream os2 = fs.append(file1)) { - assertThat(os2, notNull()); + assertThat(os2, notNullValue()); } } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index c3e89dc..133de5d 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -12,6 +12,9 @@ package org.apache.storm.hdfs.spout; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -192,6 +195,9 @@ public class TestHdfsSpout { Path file1 = new Path(source.toString() + "/file_empty.txt"); createTextFile(file1, 0); + //Ensure the second file has a later modified timestamp, as the spout should pick the first file first. + Thread.sleep(2); + Path file2 = new Path(source.toString() + "/file.txt"); createTextFile(file2, 5); @@ -203,15 +209,13 @@ public class TestHdfsSpout { conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing openSpout(spout, 0, conf); - // consume empty file - runSpout(spout, "r1"); - Path arc1 = new Path(archive.toString() + "/file_empty.txt"); - checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1); - - // consume file 2 - runSpout(spout, "r5", "a0", "a1", "a2", "a3", "a4"); + // Read once. Since the first file is empty, the spout should continue with file 2 + runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4"); + //File 1 should be moved to archive + assertThat(fs.isFile(new Path(archive.toString() + "/file_empty.txt")), is(true)); + //File 2 should be read Path arc2 = new Path(archive.toString() + "/file.txt"); - checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); + checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc2); } } @@ -681,11 +685,8 @@ public class TestHdfsSpout { private void createTextFile(Path file, int lineCount) throws IOException { FSDataOutputStream os = fs.create(file); - int size = 0; for (int i = 0; i < lineCount; i++) { os.writeBytes("line " + i + System.lineSeparator()); - String msg = "line " + i + System.lineSeparator(); - size += msg.getBytes().length; } os.close(); } @@ -772,7 +773,7 @@ public class TestHdfsSpout { private final int componentId; public MockTopologyContext(int componentId, Map<String, Object> topoConf) { - super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + super(null, topoConf, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, null); this.componentId = componentId; } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java new file mode 100644 index 0000000..f88fef5 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.testing; + +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class MiniDFSClusterExtension implements BeforeEachCallback, AfterEachCallback { + + private static final String TEST_BUILD_DATA = "test.build.data"; + + private final Supplier<Configuration> hadoopConfSupplier; + private Configuration hadoopConf; + private MiniDFSCluster dfscluster; + + public MiniDFSClusterExtension() { + this(() -> new Configuration()); + } + + public MiniDFSClusterExtension(Supplier<Configuration> hadoopConfSupplier) { + this.hadoopConfSupplier = hadoopConfSupplier; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public MiniDFSCluster getDfscluster() { + return dfscluster; + } + + @Override + public void beforeEach(ExtensionContext arg0) throws Exception { + System.setProperty(TEST_BUILD_DATA, "target/test/data"); + hadoopConf = hadoopConfSupplier.get(); + dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build(); + dfscluster.waitActive(); + } + + @Override + public void afterEach(ExtensionContext arg0) throws Exception { + dfscluster.shutdown(); + System.clearProperty(TEST_BUILD_DATA); + } +} diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java index b94fb53..6265a52 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java @@ -23,6 +23,10 @@ import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; +/** + * @deprecated Use {@link MiniDFSClusterExtension} instead, along with JUnit 5 for new tests. + */ +@Deprecated public class MiniDFSClusterRule implements TestRule { private static final String TEST_BUILD_DATA = "test.build.data"; @@ -57,6 +61,7 @@ public class MiniDFSClusterRule implements TestRule { hadoopConf = hadoopConfSupplier.get(); dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build(); dfscluster.waitActive(); + base.evaluate(); } finally { if (dfscluster != null) { dfscluster.shutdown(); diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties index 1f92e45..c952abd 100644 --- a/external/storm-hdfs/src/test/resources/log4j.properties +++ b/external/storm-hdfs/src/test/resources/log4j.properties @@ -20,7 +20,4 @@ log4j.rootLogger = WARN, out log4j.appender.out = org.apache.log4j.ConsoleAppender log4j.appender.out.layout = org.apache.log4j.PatternLayout -log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n - -log4j.logger.org.apache.storm.hdfs = INFO - +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n \ No newline at end of file diff --git a/external/storm-hdfs/src/test/resources/log4j2.xml b/external/storm-hdfs/src/test/resources/log4j2.xml new file mode 100755 index 0000000..546b1b3 --- /dev/null +++ b/external/storm-hdfs/src/test/resources/log4j2.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/> + </Console> + </Appenders> + <Loggers> + <Root level="WARN"> + <AppenderRef ref="Console"/> + </Root> + <Logger name="org.apache.storm" level="INFO" additivity="false"> + <AppenderRef ref="Console"/> + </Logger> + </Loggers> +</Configuration> \ No newline at end of file diff --git a/pom.xml b/pom.xml index f273308..3b587a7 100644 --- a/pom.xml +++ b/pom.xml @@ -295,7 +295,7 @@ <servlet.version>3.1.0</servlet.version> <joda-time.version>2.3</joda-time.version> <thrift.version>0.12.0</thrift.version> - <junit.jupiter.version>5.3.2</junit.jupiter.version> + <junit.jupiter.version>5.5.0-M1</junit.jupiter.version> <surefire.version>2.22.1</surefire.version> <awaitility.version>3.1.0</awaitility.version> <hdrhistogram.version>2.1.10</hdrhistogram.version>