Repository: kafka Updated Branches: refs/heads/trunk 4c0660bf3 -> 43d5078e9
MINOR: Remove a couple of redundant `CoreUtils.rm` methods Also: * Rename remaining `CoreUtils.rm` to `delete` for consistency * Use `try with resources` in `Utils` to simplify code * Silence compiler warning due to exception catch clause in `TestUtils` Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1153 from ijuma/remove-redundant-core-utils-rm Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43d5078e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43d5078e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43d5078e Branch: refs/heads/trunk Commit: 43d5078e981bbb25fd81cdc8ba4c339cd2d3f3d2 Parents: 4c0660b Author: Ismael Juma <[email protected]> Authored: Mon Mar 28 14:35:31 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Mar 28 14:35:31 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/utils/Utils.java | 18 ++++--------- core/src/main/scala/kafka/log/Log.scala | 3 ++- .../kafka/metrics/KafkaCSVMetricsReporter.scala | 7 ++--- core/src/main/scala/kafka/utils/CoreUtils.scala | 27 +------------------- .../kafka/api/ProducerCompressionTest.scala | 2 +- .../test/scala/other/kafka/StressTestLog.scala | 5 ++-- .../other/kafka/TestLinearWriteSpeed.scala | 7 +++-- .../unit/kafka/admin/AddPartitionsTest.scala | 2 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 12 +++++---- .../integration/KafkaServerTestHarness.scala | 2 +- .../kafka/integration/RollingBounceTest.scala | 2 +- .../integration/UncleanLeaderElectionTest.scala | 2 +- .../unit/kafka/log/BrokerCompressionTest.scala | 9 +++---- .../test/scala/unit/kafka/log/CleanerTest.scala | 2 +- .../kafka/log/LogCleanerIntegrationTest.scala | 5 ++-- .../scala/unit/kafka/log/LogManagerTest.scala | 5 ++-- .../src/test/scala/unit/kafka/log/LogTest.scala | 6 ++--- .../unit/kafka/producer/ProducerTest.scala | 4 +-- .../unit/kafka/server/AdvertiseBrokerTest.scala | 4 +-- .../server/HighwatermarkPersistenceTest.scala | 7 +++-- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 3 ++- .../unit/kafka/server/LogRecoveryTest.scala | 3 ++- .../unit/kafka/server/OffsetCommitTest.scala | 9 +++---- .../server/ServerGenerateBrokerIdTest.scala | 18 ++++++------- .../unit/kafka/server/ServerShutdownTest.scala | 6 ++--- .../unit/kafka/server/ServerStartupTest.scala | 6 ++--- .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++---- .../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 9 +++---- 29 files changed, 86 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 4c4225b..0167548 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -442,13 +442,8 @@ public class Utils { */ public static Properties loadProps(String filename) throws IOException, FileNotFoundException { Properties props = new Properties(); - InputStream propStream = null; - try { - propStream = new FileInputStream(filename); + try (InputStream propStream = new FileInputStream(filename)) { props.load(propStream); - } finally { - if (propStream != null) - propStream.close(); } return props; } @@ -540,16 +535,13 @@ public class Utils { */ public static String readFileAsString(String path, Charset charset) throws IOException { if (charset == null) charset = Charset.defaultCharset(); - FileInputStream stream = new FileInputStream(new File(path)); - String result = new String(); - try { + + try (FileInputStream stream = new FileInputStream(new File(path))) { FileChannel fc = stream.getChannel(); MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()); - result = charset.decode(bb).toString(); - } finally { - stream.close(); + return charset.decode(bb).toString(); } - return result; + } public static String readFileAsString(String path) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8c956f7..81c19fa 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.utils.Utils object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false) @@ -714,7 +715,7 @@ class Log(val dir: File, removeLogMetrics() logSegments.foreach(_.delete()) segments.clear() - CoreUtils.rm(dir) + Utils.delete(dir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index cc0da9f..686f692 100755 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -22,14 +22,15 @@ package kafka.metrics import com.yammer.metrics.Metrics import java.io.File + import com.yammer.metrics.reporting.CsvReporter import java.util.concurrent.TimeUnit -import kafka.utils.{CoreUtils, VerifiableProperties, Logging} +import kafka.utils.{Logging, VerifiableProperties} +import org.apache.kafka.common.utils.Utils private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean - private class KafkaCSVMetricsReporter extends KafkaMetricsReporter with KafkaCSVMetricsReporterMBean with Logging { @@ -48,7 +49,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter if (!initialized) { val metricsConfig = new KafkaMetricsConfig(props) csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) - CoreUtils.rm(csvDir) + Utils.delete(csvDir) csvDir.mkdirs() underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index b01f5cc..fe2bebf 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -79,35 +79,10 @@ object CoreUtils extends Logging { } /** - * Recursively delete the given file/directory and any subfiles (if any exist) - * @param file The root file at which to begin deleting - */ - def rm(file: String): Unit = rm(new File(file)) - - /** * Recursively delete the list of files/directories and any subfiles (if any exist) * @param files sequence of files to be deleted */ - def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f))) - - /** - * Recursively delete the given file/directory and any subfiles (if any exist) - * @param file The root file at which to begin deleting - */ - def rm(file: File) { - if(file == null) { - return - } else if(file.isDirectory) { - val files = file.listFiles() - if(files != null) { - for(f <- files) - rm(f) - } - file.delete() - } else { - file.delete() - } - } + def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f))) /** * Register the given mbean with the platform mbean server, http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index c4a2bd7..fc1ceec 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -55,7 +55,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness @After override def tearDown() { server.shutdown - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index dead0eb..8adc7e2 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -19,11 +19,12 @@ package kafka import java.util.Properties import java.util.concurrent.atomic._ -import kafka.common._ + import kafka.message._ import kafka.log._ import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException +import org.apache.kafka.common.utils.Utils /** * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it @@ -55,7 +56,7 @@ object StressTestLog { running.set(false) writer.join() reader.join() - CoreUtils.rm(dir) + Utils.delete(dir) } }) http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 236d857..db281bf 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -21,11 +21,14 @@ import java.io._ import java.nio._ import java.nio.channels._ import java.util.{Properties, Random} + import kafka.log._ import kafka.utils._ import kafka.message._ + import scala.math._ import joptsimple._ +import org.apache.kafka.common.utils.Utils /** * This test does linear writes using either a kafka log or a file and measures throughput and latency. @@ -196,7 +199,7 @@ object TestLinearWriteSpeed { } class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { - CoreUtils.rm(dir) + Utils.delete(dir) val log = new Log(dir, config, 0L, scheduler, SystemTime) def write(): Int = { log.append(messages, true) @@ -204,7 +207,7 @@ object TestLinearWriteSpeed { } def close() { log.close() - CoreUtils.rm(log.dir) + Utils.delete(log.dir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index b9bbace..ab8d363 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -59,7 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @After override def tearDown() { servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 8910e09..21bb6ab 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -22,13 +22,15 @@ import org.apache.kafka.common.protocol.ApiKeys import org.junit.Assert._ import org.junit.Test import java.util.Properties + import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, TopicAndPartition} -import kafka.server.{ConfigType, KafkaServer, KafkaConfig} +import kafka.utils.{Logging, TestUtils, ZkUtils} +import kafka.common.{TopicAndPartition, TopicExistsException} +import kafka.server.{ConfigType, KafkaConfig, KafkaServer} import java.io.File + import TestUtils._ import scala.collection.{Map, immutable} @@ -418,7 +420,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { assertEquals(newConfig, configInZk) } finally { server.shutdown() - server.config.logDirs.foreach(CoreUtils.rm(_)) + CoreUtils.delete(server.config.logDirs) } } @@ -449,7 +451,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { assertEquals(new Quota(2000, true), server.apis.quotaManagers(ApiKeys.FETCH.id).quota(clientId)) } finally { server.shutdown() - server.config.logDirs.foreach(CoreUtils.rm(_)) + CoreUtils.delete(server.config.logDirs) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 870b9ad..676772f 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -87,7 +87,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { @After override def tearDown() { servers.foreach(_.shutdown()) - servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_))) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) super.tearDown } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index b931568..5221855 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -43,7 +43,7 @@ class RollingBounceTest extends ZooKeeperTestHarness { @After override def tearDown() { servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index b725d8b..a8ba283 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -82,7 +82,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @After override def tearDown() { servers.foreach(server => shutdownServer(server)) - servers.foreach(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index d0cb4a1..7487bc5 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -17,7 +17,6 @@ package kafka.log -import java.io.File import kafka.utils._ import kafka.message._ import org.scalatest.junit.JUnitSuite @@ -26,9 +25,9 @@ import org.junit.Assert._ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.{Properties, Collection, ArrayList} -import kafka.server.KafkaConfig import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.utils.Utils +import java.util.{Collection, Properties} import scala.collection.JavaConversions._ @RunWith(value = classOf[Parameterized]) @@ -41,7 +40,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin @After def tearDown() { - CoreUtils.rm(tmpDir) + Utils.delete(tmpDir) } /** @@ -78,4 +77,4 @@ object BrokerCompressionTest { messageCompression <- CompressionType.values ) yield Array(messageCompression.name, brokerCompression) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 3773233..b6849f0 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -49,7 +49,7 @@ class CleanerTest extends JUnitSuite { @After def teardown() { - CoreUtils.rm(tmpdir) + Utils.delete(tmpdir) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 6b91611..cc9873c 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -25,6 +25,7 @@ import kafka.message._ import kafka.server.OffsetCheckpoint import kafka.utils._ import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit._ import org.junit.runner.RunWith @@ -119,7 +120,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { @After def teardown() { time.scheduler.shutdown() - CoreUtils.rm(logDir) + Utils.delete(logDir) } /* create a cleaner instance and logs with the given parameters */ @@ -165,4 +166,4 @@ object LogCleanerIntegrationTest { list.add(Array(codec.name)) list } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 46bfbed..f290d54 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -24,6 +24,7 @@ import kafka.common._ import kafka.server.OffsetCheckpoint import kafka.utils._ import org.apache.kafka.common.errors.OffsetOutOfRangeException +import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -54,8 +55,8 @@ class LogManagerTest { def tearDown() { if(logManager != null) logManager.shutdown() - CoreUtils.rm(logDir) - logManager.logDirs.foreach(CoreUtils.rm(_)) + Utils.delete(logDir) + logManager.logDirs.foreach(Utils.delete) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c2eb817..4d75d53 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -19,7 +19,6 @@ package kafka.log import java.io._ import java.util.Properties -import java.util.concurrent.atomic._ import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import kafka.api.ApiVersion @@ -30,6 +29,7 @@ import org.junit.{After, Before, Test} import kafka.message._ import kafka.utils._ import kafka.server.KafkaConfig +import org.apache.kafka.common.utils.Utils class LogTest extends JUnitSuite { @@ -47,7 +47,7 @@ class LogTest extends JUnitSuite { @After def tearDown() { - CoreUtils.rm(tmpDir) + Utils.delete(tmpDir) } def createEmptyLogs(dir: File, offsets: Int*) { @@ -810,7 +810,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(numMessages, log.logEndOffset) assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) - CoreUtils.rm(logDir) + Utils.delete(logDir) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 4a1ad5a..cf25cdb 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -96,8 +96,8 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ server1.shutdown server2.shutdown - CoreUtils.rm(server1.config.logDirs) - CoreUtils.rm(server2.config.logDirs) + CoreUtils.delete(server1.config.logDirs) + CoreUtils.delete(server2.config.logDirs) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index 75fa664..dc17aa4 100755 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -43,7 +43,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { @After override def tearDown() { server.shutdown() - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) super.tearDown() } @@ -55,4 +55,4 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { assertEquals(advertisedPort, endpoint.port) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 2e66601..26e2817 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -18,16 +18,15 @@ package kafka.server import kafka.log._ import java.io.File -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.{Utils, MockTime => JMockTime} import org.easymock.EasyMock import org.junit._ import org.junit.Assert._ import kafka.common._ import kafka.cluster.Replica -import kafka.utils.{ZkUtils, SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils} +import kafka.utils.{KafkaScheduler, MockTime, SystemTime, TestUtils, ZkUtils} import java.util.concurrent.atomic.AtomicBoolean -import org.apache.kafka.common.utils.{MockTime => JMockTime} class HighwatermarkPersistenceTest { @@ -42,7 +41,7 @@ class HighwatermarkPersistenceTest { @After def teardown() { for(manager <- logManagers; dir <- manager.logDirs) - CoreUtils.rm(dir) + Utils.delete(dir) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index e84780a..7258980 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -58,7 +58,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { @After override def tearDown() { servers.foreach(_.shutdown()) - servers.foreach(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 8c86a7b..d5c696e 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -30,6 +30,7 @@ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.utils.Utils import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -57,7 +58,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { override def tearDown() { simpleConsumer.close server.shutdown - CoreUtils.rm(logDir) + Utils.delete(logDir) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index e13bfd9..d37de76 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -26,6 +26,7 @@ import java.io.File import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} +import org.apache.kafka.common.utils.Utils import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -94,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { producer.close() for (server <- servers) { server.shutdown() - CoreUtils.rm(server.config.logDirs(0)) + Utils.delete(new File(server.config.logDirs(0))) } super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 1d5148b..29eaf2d 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -19,13 +19,14 @@ package kafka.server import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.consumer.SimpleConsumer -import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition} +import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.utils._ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.Errors - +import org.apache.kafka.common.utils.Utils import org.junit.{After, Before, Test} +import org.junit.Assert._ import java.util.Properties import java.io.File @@ -33,8 +34,6 @@ import java.io.File import scala.util.Random import scala.collection._ -import org.junit.Assert._ - class OffsetCommitTest extends ZooKeeperTestHarness { val random: Random = new Random() val group = "test-group" @@ -71,7 +70,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { override def tearDown() { simpleConsumer.close server.shutdown - CoreUtils.rm(logDir) + Utils.delete(logDir) super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index c26ff13..8e25366 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -51,7 +51,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { server1.startup() assertEquals(server1.config.brokerId, 1001) server1.shutdown() - CoreUtils.rm(server1.config.logDirs) + CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -75,9 +75,9 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001)) assertTrue(verifyBrokerMetadata(server2.config.logDirs,0)) assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002)) - CoreUtils.rm(server1.config.logDirs) - CoreUtils.rm(server2.config.logDirs) - CoreUtils.rm(server3.config.logDirs) + CoreUtils.delete(server1.config.logDirs) + CoreUtils.delete(server2.config.logDirs) + CoreUtils.delete(server3.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -93,7 +93,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { assertEquals(server3.config.brokerId,3) server3.shutdown() assertTrue(verifyBrokerMetadata(server3.config.logDirs,3)) - CoreUtils.rm(server3.config.logDirs) + CoreUtils.delete(server3.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -116,7 +116,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { server1.startup() server1.shutdown() assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) - CoreUtils.rm(server1.config.logDirs) + CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -133,7 +133,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { case e: kafka.common.InconsistentBrokerIdException => //success } server1.shutdown() - CoreUtils.rm(server1.config.logDirs) + CoreUtils.delete(server1.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } @@ -170,8 +170,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { // verify correct broker metadata was written assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1)) assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2)) - CoreUtils.rm(serverA.config.logDirs) - CoreUtils.rm(newServerB.config.logDirs) + CoreUtils.delete(serverA.config.logDirs) + CoreUtils.delete(newServerB.config.logDirs) TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 67f62d9..bc71edd 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -104,7 +104,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { consumer.close() producer.close() server.shutdown() - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) verifyNonDaemonThreadsStatus } @@ -117,7 +117,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { server.startup() server.shutdown() server.awaitShutdown() - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) verifyNonDaemonThreadsStatus } @@ -145,7 +145,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { server.shutdown() server.awaitShutdown() } - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) verifyNonDaemonThreadsStatus } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index b321a02..9b49365 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -40,7 +40,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { assertTrue(pathExists) server.shutdown() - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) } @Test @@ -66,7 +66,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1) server1.shutdown() - CoreUtils.rm(server1.config.logDirs) + CoreUtils.delete(server1.config.logDirs) } @Test @@ -80,6 +80,6 @@ class ServerStartupTest extends ZooKeeperTestHarness { assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id) server.shutdown() - CoreUtils.rm(server.config.logDirs) + CoreUtils.delete(server.config.logDirs) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0730468..a1e7912 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -21,9 +21,8 @@ import java.io._ import java.nio._ import java.nio.file.Files import java.nio.channels._ -import java.util -import java.util.concurrent.{Callable, TimeUnit, Executors} -import java.util.{Collections, Random, Properties} +import java.util.concurrent.{Callable, Executors, TimeUnit} +import java.util.{Collections, Properties, Random} import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import charset.Charset @@ -52,6 +51,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} +import org.apache.kafka.common.utils.Utils import scala.collection.Map import scala.collection.JavaConversions._ @@ -100,7 +100,7 @@ object TestUtils extends Logging { Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { - CoreUtils.rm(f) + Utils.delete(f) } }) f @@ -1115,7 +1115,7 @@ object TestUtils extends Logging { } } catch { case ie: InterruptedException => failWithTimeout() - case e => exceptions += e + case e: Throwable => exceptions += e } finally { threadPool.shutdownNow() } http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 5fa2f65..1030c46 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -21,10 +21,9 @@ import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory import kafka.utils.TestUtils import java.net.InetSocketAddress -import javax.security.auth.login.Configuration + import kafka.utils.CoreUtils -import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.utils.Utils.getPort +import org.apache.kafka.common.utils.Utils class EmbeddedZookeeper() { val snapshotDir = TestUtils.tempDir() @@ -40,8 +39,8 @@ class EmbeddedZookeeper() { def shutdown() { CoreUtils.swallow(zookeeper.shutdown()) CoreUtils.swallow(factory.shutdown()) - CoreUtils.rm(logDir) - CoreUtils.rm(snapshotDir) + Utils.delete(logDir) + Utils.delete(snapshotDir) } }
