[jira] [Updated] (SPARK-23239) KafkaRelationSuite should clean up its continuous queries
[ https://issues.apache.org/jira/browse/SPARK-23239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23239: -- Description: Currently, `KafkaRelationSuite` doesn't clean up its continuous microbatch queries. As a result, the next suite seems to hang at 'processAllAvailable` sometimes. {code} ... KafkaRelationSuite: - explicit earliest to latest offsets - default starting and ending offsets - explicit offsets - reuse same dataframe in query - test late binding start offsets - bad batch query options KafkaSourceStressSuite: (hang here) {code} {code} "stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId = f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 tid=0x7f9dfe3e0e30 nid=0xa6c87 runnable [0x7f9d62ff9000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00072def5c40> (a sun.nio.ch.Util$3) - locked <0x00072def5c30> (a java.util.Collections$UnmodifiableSet) - locked <0x00072def1f38> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:470) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) - locked <0x00072def1e78> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46) at scala.collection.SetLike$class.map(SetLike.scala:92) at scala.collection.mutable.AbstractSet.map(Set.scala:46) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278) - locked <0x00072df2c308> (a org.apache.spark.sql.kafka010.KafkaOffsetReader) at org.apache.spark.sql.
[jira] [Updated] (SPARK-23239) KafkaRelationSuite should clean up its continuous queries
[ https://issues.apache.org/jira/browse/SPARK-23239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-23239: -- Description: Currently, `KafkaRelationSuite` doesn't clean up its continuous queries. As a result, the next suite seems to hang at 'processAllAvailable` sometimes. {code} ... KafkaRelationSuite: - explicit earliest to latest offsets - default starting and ending offsets - explicit offsets - reuse same dataframe in query - test late binding start offsets - bad batch query options KafkaSourceStressSuite: (hang here) {code} {code} "stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId = f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 tid=0x7f9dfe3e0e30 nid=0xa6c87 runnable [0x7f9d62ff9000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00072def5c40> (a sun.nio.ch.Util$3) - locked <0x00072def5c30> (a java.util.Collections$UnmodifiableSet) - locked <0x00072def1f38> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:470) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) - locked <0x00072def1e78> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46) at scala.collection.SetLike$class.map(SetLike.scala:92) at scala.collection.mutable.AbstractSet.map(Set.scala:46) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278) - locked <0x00072df2c308> (a org.apache.spark.sql.kafka010.KafkaOffsetReader) at org.apache.spark.sql.kafka010.Kaf