Are you sure RedisClientPool is being initialized properly in the
constructor of RedisCache? Can you please copy paste the code that you use
to initialize RedisClientPool inside the constructor of RedisCache?

Thanks,
Aniket

On Fri, Oct 23, 2015 at 11:47 AM Bin Wang <wbi...@gmail.com> wrote:

> BTW, "lines" is a DStream.
>
> Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道:
>
>> I use mapPartitions to open connections to Redis, I write it like this:
>>
>>     val seqs = lines.mapPartitions { lines =>
>>       val cache = new RedisCache(redisUrl, redisPort)
>>       val result = lines.map(line => Parser.parseBody(line, cache))
>>       cache.redisPool.close
>>       result
>>     }
>>
>> But it seems the pool is closed before I use it. Am I doing anything
>> wrong? Here is the error:
>>
>> java.lang.IllegalStateException: Pool not open
>>      at 
>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>      at 
>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>      at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>      at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>      at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>      at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>      at scala.collection.immutable.List.foreach(List.scala:318)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>      at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>      at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>      at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>      at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>      at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>      at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>      at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>      at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>>

Reply via email to