[jira] [Updated] (SPARK-23239) KafkaRelationSuite should clean up its continuous queries

2018-01-26 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-23239:
--
Description: 
Currently, `KafkaRelationSuite` doesn't clean up its continuous microbatch 
queries. As a result, the next suite seems to hang at 'processAllAvailable` 
sometimes.

{code}
...
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
KafkaSourceStressSuite:
(hang here)
{code}

{code}
"stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId 
= f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 
tid=0x7f9dfe3e0e30 nid=0xa6c87 runnable [0x7f9d62ff9000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00072def5c40> (a sun.nio.ch.Util$3)
- locked <0x00072def5c30> (a java.util.Collections$UnmodifiableSet)
- locked <0x00072def1f38> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00072def1e78> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.mutable.AbstractSet.map(Set.scala:46)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278)
- locked <0x00072df2c308> (a 
org.apache.spark.sql.kafka010.KafkaOffsetReader)
at 
org.apache.spark.sql.

[jira] [Updated] (SPARK-23239) KafkaRelationSuite should clean up its continuous queries

2018-01-26 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-23239:
--
Description: 
Currently, `KafkaRelationSuite` doesn't clean up its continuous queries. As a 
result, the next suite seems to hang at 'processAllAvailable` sometimes.

{code}
...
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
KafkaSourceStressSuite:
(hang here)
{code}

{code}
"stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId 
= f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 
tid=0x7f9dfe3e0e30 nid=0xa6c87 runnable [0x7f9d62ff9000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00072def5c40> (a sun.nio.ch.Util$3)
- locked <0x00072def5c30> (a java.util.Collections$UnmodifiableSet)
- locked <0x00072def1f38> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00072def1e78> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.mutable.AbstractSet.map(Set.scala:46)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278)
- locked <0x00072df2c308> (a 
org.apache.spark.sql.kafka010.KafkaOffsetReader)
at 
org.apache.spark.sql.kafka010.Kaf