Are you sure RedisClientPool is being initialized properly in the constructor of RedisCache? Can you please copy paste the code that you use to initialize RedisClientPool inside the constructor of RedisCache?
Thanks, Aniket On Fri, Oct 23, 2015 at 11:47 AM Bin Wang <wbi...@gmail.com> wrote: > BTW, "lines" is a DStream. > > Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道: > >> I use mapPartitions to open connections to Redis, I write it like this: >> >> val seqs = lines.mapPartitions { lines => >> val cache = new RedisCache(redisUrl, redisPort) >> val result = lines.map(line => Parser.parseBody(line, cache)) >> cache.redisPool.close >> result >> } >> >> But it seems the pool is closed before I use it. Am I doing anything >> wrong? Here is the error: >> >> java.lang.IllegalStateException: Pool not open >> at >> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) >> at >> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) >> at com.redis.RedisClientPool.withClient(Pool.scala:34) >> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) >> at >> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) >> at >> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) >> at >> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >> at >> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >>