[spark] branch master updated: [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350

2019-01-15 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2ebb79b  [SPARK-26350][FOLLOWUP] Add actual verification on new UT 
introduced on SPARK-26350
2ebb79b is described below

commit 2ebb79b2a607aa25ea22826d9c5d6af18c97a7f2
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Jan 15 14:21:51 2019 -0800

[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on 
SPARK-26350

## What changes were proposed in this pull request?

This patch adds the check to verify consumer group id is given correctly 
when custom group id is provided to Kafka parameter.

## How was this patch tested?

Modified UT.

Closes #23544 from 
HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala| 14 --
 .../org/apache/spark/sql/kafka010/KafkaRelationSuite.scala | 13 -
 .../org/apache/spark/sql/kafka010/KafkaTestUtils.scala |  6 +-
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 6402088..cb45384 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
 
+import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
 testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
 
+val customGroupId = "id-" + Random.nextInt()
 val dsKafka = spark
   .readStream
   .format("kafka")
-  .option("kafka.group.id", "id-" + Random.nextInt())
+  .option("kafka.group.id", customGroupId)
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
   .option("subscribe", topic)
   .option("startingOffsets", "earliest")
@@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 
 testStream(dsKafka)(
   makeSureGetOffsetCalled,
-  CheckAnswer(1 to 30: _*)
+  CheckAnswer(1 to 30: _*),
+  Execute { _ =>
+val consumerGroups = testUtils.listConsumerGroups()
+val validGroups = consumerGroups.valid().get()
+val validGroupsId = validGroups.asScala.map(_.groupId())
+assert(validGroupsId.exists(_ === customGroupId), "Valid consumer 
groups don't " +
+  s"contain the expected group id - Valid consumer groups: 
$validGroupsId / " +
+  s"expected group id: $customGroupId")
+  }
 )
   }
 
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index efe7385..2cd13a9 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010
 import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
+import scala.util.Random
+
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 
@@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
 testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
 testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
 
-val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
+val customGroupId = "id-" + Random.nextInt()
+val df = createDF(topic, withOptions = Map("kafka.group.id" -> 
customGroupId))
 checkAnswer(df, (1 to 30).map(_.toString).toDF())
+
+val consumerGroups = testUtils.listConsumerGroups()
+v

[spark] branch master updated: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka

2018-12-21 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8e76d66  [SPARK-26267][SS] Retry when detecting incorrect offsets from 
Kafka
8e76d66 is described below

commit 8e76d6621aaddb8b73443b14ea2c6eebe9089893
Author: Shixiong Zhu 
AuthorDate: Fri Dec 21 10:41:25 2018 -0800

[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka

## What changes were proposed in this pull request?

Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), 
Kafka may return an earliest offset when we are request a latest offset. This 
will cause Spark to reprocess data.

As per suggestion in KAFKA-7703, we put a position call between poll and 
seekToEnd to block the fetch request triggered by `poll` before calling 
`seekToEnd`.

In addition, to avoid other unknown issues, we also use the previous known 
offsets to audit the latest offsets returned by Kafka. If we find some 
incorrect offsets (a latest offset is less than an offset in `knownOffsets`), 
we will retry at most `maxOffsetFetchAttempts` times.

## How was this patch tested?

Jenkins

Closes #23324 from zsxwing/SPARK-26267.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../sql/kafka010/KafkaContinuousReadSupport.scala  |  4 +-
 .../sql/kafka010/KafkaMicroBatchReadSupport.scala  | 19 +++--
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  2 +
 .../spark/sql/kafka010/KafkaOffsetReader.scala | 80 --
 .../apache/spark/sql/kafka010/KafkaSource.scala|  5 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 48 +
 6 files changed, 145 insertions(+), 13 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 1753a28..02dfb9c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
   override def initialOffset(): Offset = {
 val offsets = initialOffsets match {
   case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
   case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, 
reportDataLoss)
 }
 logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
 
   override def needsReconfiguration(config: ScanConfig): Boolean = {
 val knownPartitions = 
config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-offsetReader.fetchLatestOffsets().keySet != knownPartitions
+offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index bb4de67..b4f042e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 
   override def latestOffset(start: Offset): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
 endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
   rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
 }.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 }.toSeq
 logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+val untilOffsets = endPartitionOffsets
+untilOffsets.foreach { case (tp, untilOffset) =>
+  fromOffsets.get(tp).foreach { fromOffset =>
+if (untilOffset < fromOffset) {
+  reportDataLoss(s"Partition $tp's offset was changed from " +

spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

2018-11-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 77c0629cb -> c23b801d3


[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that the 
`metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

Closes #23060 from zsxwing/SPARK-26092.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 058c4602b000b24deb764a810ef8b43c41fe63ae)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c23b801d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c23b801d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c23b801d

Branch: refs/heads/branch-2.4
Commit: c23b801d3c87b12e729b98910833b441db05bd45
Parents: 77c0629
Author: Shixiong Zhu 
Authored: Fri Nov 16 15:43:27 2018 -0800
Committer: Shixiong Zhu 
Committed: Fri Nov 16 15:43:44 2018 -0800

--
 .../streaming/CheckpointFileManager.scala   |  2 +-
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../execution/streaming/StreamMetadata.scala| 23 ++--
 3 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 606ba25..b3e4240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -56,7 +56,7 @@ trait CheckpointFileManager {
* @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
*overwrite the file if it already exists. It 
should not throw
*any exception if the file exists. However, if 
false, then the
-   *implementation must not overwrite if the file 
alraedy exists and
+   *implementation must not overwrite if the file 
already exists and
*must throw `FileAlreadyExistsException` in 
that case.
*/
   def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f6c60c1..de33844 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -87,6 +87,7 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
 val checkpointPath = new Path(checkpointRoot)
 val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.mkdirs(checkpointPath)
 checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 0bc54ea..516afbe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{InputStreamReader, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.util.ConcurrentModificationException
 
 import scala.util.control.NonFatal
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 imp

spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

2018-11-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 99cbc51b3 -> 058c4602b


[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that the 
`metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

Closes #23060 from zsxwing/SPARK-26092.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/058c4602
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/058c4602
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/058c4602

Branch: refs/heads/master
Commit: 058c4602b000b24deb764a810ef8b43c41fe63ae
Parents: 99cbc51
Author: Shixiong Zhu 
Authored: Fri Nov 16 15:43:27 2018 -0800
Committer: Shixiong Zhu 
Committed: Fri Nov 16 15:43:27 2018 -0800

--
 .../streaming/CheckpointFileManager.scala   |  2 +-
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../execution/streaming/StreamMetadata.scala| 23 ++--
 3 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 606ba25..b3e4240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -56,7 +56,7 @@ trait CheckpointFileManager {
* @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
*overwrite the file if it already exists. It 
should not throw
*any exception if the file exists. However, if 
false, then the
-   *implementation must not overwrite if the file 
alraedy exists and
+   *implementation must not overwrite if the file 
already exists and
*must throw `FileAlreadyExistsException` in 
that case.
*/
   def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 631a6eb..89b4f40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -88,6 +88,7 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
 val checkpointPath = new Path(checkpointRoot)
 val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.mkdirs(checkpointPath)
 checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 0bc54ea..516afbe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{InputStreamReader, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.util.ConcurrentModificationException
 
 import scala.util.control.NonFatal
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+imp

[spark] branch branch-2.4 updated: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

2019-01-07 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new faa4c28  [SPARK-26267][SS] Retry when detecting incorrect offsets from 
Kafka (2.4)
faa4c28 is described below

commit faa4c2823b69c1643d7678ee1cb0b7295c611334
Author: Shixiong Zhu 
AuthorDate: Mon Jan 7 16:53:07 2019 -0800

[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

## What changes were proposed in this pull request?

Backport #23324 to branch-2.4.

## How was this patch tested?

Jenkins

Closes #23365 from zsxwing/SPARK-26267-2.4.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../spark/sql/kafka010/KafkaContinuousReader.scala |  4 +-
 .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 20 --
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  2 +
 .../spark/sql/kafka010/KafkaOffsetReader.scala | 80 --
 .../apache/spark/sql/kafka010/KafkaSource.scala|  5 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 48 +
 6 files changed, 146 insertions(+), 13 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8ce56a2..561d501 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -73,7 +73,7 @@ class KafkaContinuousReader(
 offset = start.orElse {
   val offsets = initialOffsets match {
 case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
 case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
   }
   logInfo(s"Initial offsets: $offsets")
@@ -128,7 +128,7 @@ class KafkaContinuousReader(
   }
 
   override def needsReconfiguration(): Boolean = {
-knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != 
knownPartitions
+knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet != 
knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8cc989f..b6c8035 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader(
 endPartitionOffsets = Option(end.orElse(null))
 .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
 .getOrElse {
-  val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+  val latestPartitionOffsets =
+kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
   maxOffsetsPerTrigger.map { maxOffsets =>
 rateLimit(maxOffsets, startPartitionOffsets, 
latestPartitionOffsets)
   }.getOrElse {
@@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader(
 }.toSeq
 logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+val untilOffsets = endPartitionOffsets
+untilOffsets.foreach { case (tp, untilOffset) =>
+  fromOffsets.get(tp).foreach { fromOffset =>
+if (untilOffset < fromOffset) {
+  reportDataLoss(s"Partition $tp's offset was changed from " +
+s"$fromOffset to $untilOffset, some data may have been missed")
+}
+  }
+}
+
 // Calculate offset ranges
 val offsetRanges = rangeCalculator.getRanges(
-  fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-  untilOffsets = endPartitionOffsets,
+  fromOffsets = fromOffsets,
+  untilOffsets = untilOffsets,
   executorLocations = getSortedExecutorList())
 
 // Reuse Kafka consumers only when all the offset ranges have distinct 
TopicPartitions,
@@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader(
 case EarliestOffsetRangeLimit =>
   KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
  

[spark] branch branch-2.4 updated: [SPARK-26586][SS] Fix race condition that causes streams to run with unexpected confs

2019-01-11 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new bbf61eb  [SPARK-26586][SS] Fix race condition that causes streams to 
run with unexpected confs
bbf61eb is described below

commit bbf61eb4222d7b46e71dc91eeedf82d27226fc2c
Author: Mukul Murthy 
AuthorDate: Fri Jan 11 11:46:14 2019 -0800

[SPARK-26586][SS] Fix race condition that causes streams to run with 
unexpected confs

## What changes were proposed in this pull request?

Fix race condition where streams can have unexpected conf values.

New streaming queries should run with isolated SparkSessions so that they 
aren't affected by conf updates after they are started. In StreamExecution, the 
parent SparkSession is cloned and used to run each batch, but this cloning 
happens in a separate thread and may happen after DataStreamWriter.start() 
returns. If a stream is started and a conf key is set immediately after, the 
stream is likely to have the new value.

## How was this patch tested?

New unit test that fails prior to the production change and passes with it.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

Closes #23513 from mukulmurthy/26586.

Authored-by: Mukul Murthy 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit ae382c94dd10ff494dde4de44e66182bf6dbe8f8)
Signed-off-by: Shixiong Zhu 
---
 .../sql/execution/streaming/StreamExecution.scala  |  5 +++--
 .../test/DataStreamReaderWriterSuite.scala | 24 ++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index de33844..c1aa98a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -176,6 +176,9 @@ abstract class StreamExecution(
   lazy val streamMetrics = new MetricsReporter(
 this, s"spark.streaming.${Option(name).getOrElse(id)}")
 
+  /** Isolated spark session to run the batches with. */
+  private val sparkSessionForStream = sparkSession.cloneSession()
+
   /**
* The thread that runs the micro-batches of this stream. Note that this 
thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: 
interrupting a
@@ -265,8 +268,6 @@ abstract class StreamExecution(
   // force initialization of the logical plan so that the sources can be 
created
   logicalPlan
 
-  // Isolated spark session to run the batches with.
-  val sparkSessionForStream = sparkSession.cloneSession()
   // Adaptive execution can change num shuffle partitions, disallow
   sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
   // Disable cost-based join optimization as we do not want stateful 
operations to be rearranged
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 8212fb9..569114a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming.test
 
 import java.io.File
+import java.util.ConcurrentModificationException
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -651,4 +652,27 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
   }
+
+  test("SPARK-26586: Streams should have isolated confs") {
+import testImplicits._
+val input = MemoryStream[Int]
+input.addData(1 to 10)
+spark.conf.set("testKey1", 0)
+val queries = (1 to 10).map { i =>
+  spark.conf.set("testKey1", i)
+  input.toDF().writeStream
+.foreachBatch { (df: Dataset[Row], id: Long) =>
+  val v = df.sparkSession.conf.get("testKey1").toInt
+  if (i != v) {
+throw new ConcurrentModificationException(s"Stream $i has the 
wrong conf value $v")
+  }
+}
+.start()
+}
+try {
+  queries.foreach(_.processAllAvailable())
+} finally {
+  queries.foreach(_.stop())
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26586][SS] Fix race condition that causes streams to run with unexpected confs

2019-01-11 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ae382c9  [SPARK-26586][SS] Fix race condition that causes streams to 
run with unexpected confs
ae382c9 is described below

commit ae382c94dd10ff494dde4de44e66182bf6dbe8f8
Author: Mukul Murthy 
AuthorDate: Fri Jan 11 11:46:14 2019 -0800

[SPARK-26586][SS] Fix race condition that causes streams to run with 
unexpected confs

## What changes were proposed in this pull request?

Fix race condition where streams can have unexpected conf values.

New streaming queries should run with isolated SparkSessions so that they 
aren't affected by conf updates after they are started. In StreamExecution, the 
parent SparkSession is cloned and used to run each batch, but this cloning 
happens in a separate thread and may happen after DataStreamWriter.start() 
returns. If a stream is started and a conf key is set immediately after, the 
stream is likely to have the new value.

## How was this patch tested?

New unit test that fails prior to the production change and passes with it.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

Closes #23513 from mukulmurthy/26586.

Authored-by: Mukul Murthy 
Signed-off-by: Shixiong Zhu 
---
 .../sql/execution/streaming/StreamExecution.scala  |  5 +++--
 .../test/DataStreamReaderWriterSuite.scala | 24 ++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 83824f4..90f7b47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -181,6 +181,9 @@ abstract class StreamExecution(
   lazy val streamMetrics = new MetricsReporter(
 this, s"spark.streaming.${Option(name).getOrElse(id)}")
 
+  /** Isolated spark session to run the batches with. */
+  private val sparkSessionForStream = sparkSession.cloneSession()
+
   /**
* The thread that runs the micro-batches of this stream. Note that this 
thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: 
interrupting a
@@ -270,8 +273,6 @@ abstract class StreamExecution(
   // force initialization of the logical plan so that the sources can be 
created
   logicalPlan
 
-  // Isolated spark session to run the batches with.
-  val sparkSessionForStream = sparkSession.cloneSession()
   // Adaptive execution can change num shuffle partitions, disallow
   sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
   // Disable cost-based join optimization as we do not want stateful 
operations to be rearranged
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4d3a54a..74ea0bf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming.test
 
 import java.io.File
+import java.util.ConcurrentModificationException
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -651,4 +652,27 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
 LastOptions.clear()
   }
+
+  test("SPARK-26586: Streams should have isolated confs") {
+import testImplicits._
+val input = MemoryStream[Int]
+input.addData(1 to 10)
+spark.conf.set("testKey1", 0)
+val queries = (1 to 10).map { i =>
+  spark.conf.set("testKey1", i)
+  input.toDF().writeStream
+.foreachBatch { (df: Dataset[Row], id: Long) =>
+  val v = df.sparkSession.conf.get("testKey1").toInt
+  if (i != v) {
+throw new ConcurrentModificationException(s"Stream $i has the 
wrong conf value $v")
+  }
+}
+.start()
+}
+try {
+  queries.foreach(_.processAllAvailable())
+} finally {
+  queries.foreach(_.stop())
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever

2019-01-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 9814108  [SPARK-26665][CORE] Fix a bug that 
BlockTransferService.fetchBlockSync may hang forever
9814108 is described below

commit 9814108a4f51aeb281f14a8421ac1d735c85
Author: Shixiong Zhu 
AuthorDate: Tue Jan 22 09:00:52 2019 -0800

[SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may 
hang forever

## What changes were proposed in this pull request?

`ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large 
but no enough memory is available. However, when this happens, right now 
BlockTransferService.fetchBlockSync will just hang forever as its 
`BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`.

This PR catches `Throwable` and uses the error to complete `Promise`.

## How was this patch tested?

Added a unit test. Since I cannot make `ByteBuffer.allocate` throw 
`OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` 
fail. Although the error type is different, it should trigger the same code 
path.

Closes #23590 from zsxwing/SPARK-26665.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 66450bbc1bb4397f06ca9a6ecba4d16c82d711fd)
Signed-off-by: Shixiong Zhu 
---
 .../spark/network/BlockTransferService.scala   |  12 ++-
 .../spark/network/BlockTransferServiceSuite.scala  | 104 +
 2 files changed, 112 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index a58c8fa..51ced69 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -107,10 +107,14 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
 case e: EncryptedManagedBuffer =>
   result.success(e)
 case _ =>
-  val ret = ByteBuffer.allocate(data.size.toInt)
-  ret.put(data.nioByteBuffer())
-  ret.flip()
-  result.success(new NioManagedBuffer(ret))
+  try {
+val ret = ByteBuffer.allocate(data.size.toInt)
+ret.put(data.nioByteBuffer())
+ret.flip()
+result.success(new NioManagedBuffer(ret))
+  } catch {
+case e: Throwable => result.failure(e)
+  }
   }
 }
   }, tempFileManager)
diff --git 
a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
new file mode 100644
index 000..d7e4b91
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import org.scalatest.concurrent._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+
+class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
+
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
+  test("fetchBlockSync should not hang when 
BlockFetchingListener.onBlockFetchSuccess fails") {
+// Create a mocked `BlockTransferService` to call 
`BlockFetchingListener.onBlockFetchSuccess`
+// with a bad `ManagedBuffer` which will trigger an exception in 
`onBlockFetchSuccess`.
+val blockTransferService = new BlockTra

[spark] branch branch-2.3 updated: [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever

2019-01-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new b88067b  [SPARK-26665][CORE] Fix a bug that 
BlockTransferService.fetchBlockSync may hang forever
b88067b is described below

commit b88067bd0f7b9466a89ce6458cb7766a24283b13
Author: Shixiong Zhu 
AuthorDate: Tue Jan 22 09:00:52 2019 -0800

[SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may 
hang forever

## What changes were proposed in this pull request?

`ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large 
but no enough memory is available. However, when this happens, right now 
BlockTransferService.fetchBlockSync will just hang forever as its 
`BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`.

This PR catches `Throwable` and uses the error to complete `Promise`.

## How was this patch tested?

Added a unit test. Since I cannot make `ByteBuffer.allocate` throw 
`OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` 
fail. Although the error type is different, it should trigger the same code 
path.

Closes #23590 from zsxwing/SPARK-26665.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../spark/network/BlockTransferService.scala   |  12 ++-
 .../spark/network/BlockTransferServiceSuite.scala  | 104 +
 2 files changed, 112 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index eef8c31..875e4fc 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -105,10 +105,14 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
 case f: FileSegmentManagedBuffer =>
   result.success(f)
 case _ =>
-  val ret = ByteBuffer.allocate(data.size.toInt)
-  ret.put(data.nioByteBuffer())
-  ret.flip()
-  result.success(new NioManagedBuffer(ret))
+  try {
+val ret = ByteBuffer.allocate(data.size.toInt)
+ret.put(data.nioByteBuffer())
+ret.flip()
+result.success(new NioManagedBuffer(ret))
+  } catch {
+case e: Throwable => result.failure(e)
+  }
   }
 }
   }, tempFileManager)
diff --git 
a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
new file mode 100644
index 000..d7e4b91
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import org.scalatest.concurrent._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+
+class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
+
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
+  test("fetchBlockSync should not hang when 
BlockFetchingListener.onBlockFetchSuccess fails") {
+// Create a mocked `BlockTransferService` to call 
`BlockFetchingListener.onBlockFetchSuccess`
+// with a bad `ManagedBuffer` which will trigger an exception in 
`onBlockFetchSuccess`.
+val blockTransferService = new BlockTransferService {
+  override def init(blockDataManager: BlockDataManager): Unit = {}
+
+  override def c

[spark] branch master updated: [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever

2019-01-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 66450bb  [SPARK-26665][CORE] Fix a bug that 
BlockTransferService.fetchBlockSync may hang forever
66450bb is described below

commit 66450bbc1bb4397f06ca9a6ecba4d16c82d711fd
Author: Shixiong Zhu 
AuthorDate: Tue Jan 22 09:00:52 2019 -0800

[SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may 
hang forever

## What changes were proposed in this pull request?

`ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large 
but no enough memory is available. However, when this happens, right now 
BlockTransferService.fetchBlockSync will just hang forever as its 
`BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`.

This PR catches `Throwable` and uses the error to complete `Promise`.

## How was this patch tested?

Added a unit test. Since I cannot make `ByteBuffer.allocate` throw 
`OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` 
fail. Although the error type is different, it should trigger the same code 
path.

Closes #23590 from zsxwing/SPARK-26665.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../spark/network/BlockTransferService.scala   |  12 ++-
 .../spark/network/BlockTransferServiceSuite.scala  | 104 +
 2 files changed, 112 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index a58c8fa..51ced69 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -107,10 +107,14 @@ abstract class BlockTransferService extends ShuffleClient 
with Closeable with Lo
 case e: EncryptedManagedBuffer =>
   result.success(e)
 case _ =>
-  val ret = ByteBuffer.allocate(data.size.toInt)
-  ret.put(data.nioByteBuffer())
-  ret.flip()
-  result.success(new NioManagedBuffer(ret))
+  try {
+val ret = ByteBuffer.allocate(data.size.toInt)
+ret.put(data.nioByteBuffer())
+ret.flip()
+result.success(new NioManagedBuffer(ret))
+  } catch {
+case e: Throwable => result.failure(e)
+  }
   }
 }
   }, tempFileManager)
diff --git 
a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
new file mode 100644
index 000..d7e4b91
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import org.scalatest.concurrent._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+
+class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
+
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
+  test("fetchBlockSync should not hang when 
BlockFetchingListener.onBlockFetchSuccess fails") {
+// Create a mocked `BlockTransferService` to call 
`BlockFetchingListener.onBlockFetchSuccess`
+// with a bad `ManagedBuffer` which will trigger an exception in 
`onBlockFetchSuccess`.
+val blockTransferService = new BlockTransferService {
+  override def init(blockDataManager: BlockDataManager): Unit = {}
+
+  override def close(): Un

[spark] branch master updated: [SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it contains special chars

2019-02-20 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 77b99af  [SPARK-26824][SS] Fix the checkpoint location and 
_spark_metadata when it contains special chars
77b99af is described below

commit 77b99af57330cf2e5016a6acc69642d54041b041
Author: Shixiong Zhu 
AuthorDate: Wed Feb 20 15:44:20 2019 -0800

[SPARK-26824][SS] Fix the checkpoint location and _spark_metadata when it 
contains special chars

## What changes were proposed in this pull request?

When a user specifies a checkpoint location or a file sink output using a 
path containing special chars that need to be escaped in a path, the streaming 
query will store checkpoint and file sink metadata in a wrong place. In this 
PR, I uploaded a checkpoint that was generated by the following codes using 
Spark 2.4.0 to show this issue:

```
implicit val s = spark.sqlContext
val input = org.apache.spark.sql.execution.streaming.MemoryStream[Int]
input.addData(1, 2, 3)
val q = 
input.toDF.writeStream.format("parquet").option("checkpointLocation", ".../chk 
%#chk").start(".../output %#output")
q.stop()
```
Here is the structure of the directory:
```
sql/core/src/test/resources/structured-streaming/escaped-path-2.4.0
├── chk%252520%252525%252523chk
│   ├── commits
│   │   └── 0
│   ├── metadata
│   └── offsets
│   └── 0
├── output %#output
│   └── part-0-97f675a2-bb82-4201-8245-05f3dae4c372-c000.snappy.parquet
└── output%20%25%23output
└── _spark_metadata
└── 0
```

In this checkpoint, the user specified checkpoint location is `.../chk 
%#chk` but the real path to store the checkpoint is 
`.../chk%252520%252525%252523chk` (this is generated by escaping the original 
path three times). The user specified output path is `.../output %#output` but 
the path to store `_spark_metadata` is 
`.../output%20%25%23output/_spark_metadata` (this is generated by escaping the 
original path once). The data files are still in the correct path (such as 
`.../output %#ou [...]

This checkpoint will be used in unit tests in this PR.

The fix is just simply removing improper `Path.toUri` calls to fix the 
issue.

However, as the user may not read the release note and is not aware of this 
checkpoint location change, if they upgrade Spark without moving checkpoint to 
the new location, their query will just start from the scratch. In order to not 
surprise the users, this PR also adds a check to **detect the impacted paths 
and throws an error** to include the migration guide. This check can be turned 
off by an internal sql conf 
`spark.sql.streaming.checkpoint.escapedPathCheck.enabled`. Here are ex [...]

- Streaming checkpoint error:
```
Error: we detected a possible problem with the location of your checkpoint 
and you
likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out 
checkpoints for
structured streaming. While this was corrected in Spark 3.0, it appears 
that your
query was started using an earlier version that incorrectly handled the 
checkpoint
path.

Correct Checkpoint Directory: /.../chk %#chk
Incorrect Checkpoint Directory: /.../chk%252520%252525%252523chk

Please move the data from the incorrect directory to the correct one, 
delete the
incorrect directory, and then restart this query. If you believe you are 
receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

- File sink error (`_spark_metadata`):
```
Error: we detected a possible problem with the location of your 
"_spark_metadata"
directory and you likely need to move it before restarting this query.

Earlier version of Spark incorrectly escaped paths when writing out the
"_spark_metadata" directory for structured streaming. While this was 
corrected in
Spark 3.0, it appears that your query was started using an earlier version 
that
incorrectly handled the "_spark_metadata" path.

Correct "_spark_metadata" Directory: /.../output %#output/_spark_metadata
Incorrect "_spark_metadata" Directory: 
/.../output%20%25%23output/_spark_metadata

Please move the data from the incorrect directory to the correct one, 
delete the
incorrect directory, and then restart this query. If you believe you are 
receiving
this message in error, you can disable it with the SQL conf
spark.sql.streaming.checkpoint.escapedPathCheck.enabled.
```

    ## How was this patch tested?

The new un

[spark] branch master updated: [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException

2019-03-09 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6e1c082  [SPARK-27111][SS] Fix a race that a continuous query may fail 
with InterruptedException
6e1c082 is described below

commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb
Author: Shixiong Zhu 
AuthorDate: Sat Mar 9 14:26:58 2019 -0800

[SPARK-27111][SS] Fix a race that a continuous query may fail with 
InterruptedException

## What changes were proposed in this pull request?

Before a Kafka consumer gets assigned with partitions, its offset will 
contain 0 partitions. However, runContinuous will still run and launch a Spark 
job having 0 partitions. In this case, there is a race that epoch may interrupt 
the query execution thread after `lastExecution.toRdd`, and either 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next 
`runContinuous` will get interrupted unintentionally.

To handle this case, this PR has the following changes:

- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This 
may increase the waiting time of `stop` but should be minor because the 
operations here are very fast (just sending an RPC message in the same process 
and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the 
`runContinuous` call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query termination because `runActivatedStream` will check 
`state` and exit accordingly.

I also updated the clean up codes to make sure exceptions thrown from 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the 
clean up.

## How was this patch tested?

Jenkins

Closes #24034 from zsxwing/SPARK-27111.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/continuous/ContinuousExecution.scala | 30 +-
 1 file changed, 23 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 26b5642..aef556d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -268,13 +268,29 @@ class ContinuousExecution(
 logInfo(s"Query $id ignoring exception from reconfiguring: $t")
 // interrupted by reconfiguration - swallow exception so we can 
restart the query
 } finally {
-  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
-  SparkEnv.get.rpcEnv.stop(epochEndpoint)
-
-  epochUpdateThread.interrupt()
-  epochUpdateThread.join()
-
-  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+  // The above execution may finish before getting interrupted, for 
example, a Spark job having
+  // 0 partitions will complete immediately. Then the interrupted status 
will sneak here.
+  //
+  // To handle this case, we do the two things here:
+  //
+  // 1. Clean up the resources in 
`queryExecutionThread.runUninterruptibly`. This may increase
+  //the waiting time of `stop` but should be minor because the 
operations here are very fast
+  //(just sending an RPC message in the same process and stopping a 
very simple thread).
+  // 2. Clear the interrupted status at the end so that it won't impact 
the `runContinuous`
+  //call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query
+  //termination because `runActivatedStream` will check `state` and 
exit accordingly.
+  queryExecutionThread.runUninterruptibly {
+try {
+  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
+} finally {
+  SparkEnv.get.rpcEnv.stop(epochEndpoint)
+  epochUpdateThread.interrupt()
+  epochUpdateThread.join()
+  // The following line must be the last line because it may fail if 
SparkContext is stopped
+  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+}
+  }
+  Thread.interrupted()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException

2019-03-09 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 53590f2  [SPARK-27111][SS] Fix a race that a continuous query may fail 
with InterruptedException
53590f2 is described below

commit 53590f275a7ebcd015120b576905ce999e50331e
Author: Shixiong Zhu 
AuthorDate: Sat Mar 9 14:26:58 2019 -0800

[SPARK-27111][SS] Fix a race that a continuous query may fail with 
InterruptedException

Before a Kafka consumer gets assigned with partitions, its offset will 
contain 0 partitions. However, runContinuous will still run and launch a Spark 
job having 0 partitions. In this case, there is a race that epoch may interrupt 
the query execution thread after `lastExecution.toRdd`, and either 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next 
`runContinuous` will get interrupted unintentionally.

To handle this case, this PR has the following changes:

- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This 
may increase the waiting time of `stop` but should be minor because the 
operations here are very fast (just sending an RPC message in the same process 
and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the 
`runContinuous` call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query termination because `runActivatedStream` will check 
`state` and exit accordingly.

I also updated the clean up codes to make sure exceptions thrown from 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the 
clean up.

Jenkins

Closes #24034 from zsxwing/SPARK-27111.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/continuous/ContinuousExecution.scala | 32 --
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 2e24fa6..3037c01 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -272,14 +272,30 @@ class ContinuousExecution(
 logInfo(s"Query $id ignoring exception from reconfiguring: $t")
 // interrupted by reconfiguration - swallow exception so we can 
restart the query
 } finally {
-  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
-  SparkEnv.get.rpcEnv.stop(epochEndpoint)
-
-  epochUpdateThread.interrupt()
-  epochUpdateThread.join()
-
-  stopSources()
-  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+  // The above execution may finish before getting interrupted, for 
example, a Spark job having
+  // 0 partitions will complete immediately. Then the interrupted status 
will sneak here.
+  //
+  // To handle this case, we do the two things here:
+  //
+  // 1. Clean up the resources in 
`queryExecutionThread.runUninterruptibly`. This may increase
+  //the waiting time of `stop` but should be minor because the 
operations here are very fast
+  //(just sending an RPC message in the same process and stopping a 
very simple thread).
+  // 2. Clear the interrupted status at the end so that it won't impact 
the `runContinuous`
+  //call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query
+  //termination because `runActivatedStream` will check `state` and 
exit accordingly.
+  queryExecutionThread.runUninterruptibly {
+try {
+  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
+} finally {
+  SparkEnv.get.rpcEnv.stop(epochEndpoint)
+  epochUpdateThread.interrupt()
+  epochUpdateThread.join()
+  stopSources()
+  // The following line must be the last line because it may fail if 
SparkContext is stopped
+  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+}
+  }
+  Thread.interrupted()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.3 updated: [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException

2019-03-09 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new 4d1d0a4  [SPARK-27111][SS] Fix a race that a continuous query may fail 
with InterruptedException
4d1d0a4 is described below

commit 4d1d0a41a862c234acb9b8b68e96da7bf079eb8d
Author: Shixiong Zhu 
AuthorDate: Sat Mar 9 14:26:58 2019 -0800

[SPARK-27111][SS] Fix a race that a continuous query may fail with 
InterruptedException

Before a Kafka consumer gets assigned with partitions, its offset will 
contain 0 partitions. However, runContinuous will still run and launch a Spark 
job having 0 partitions. In this case, there is a race that epoch may interrupt 
the query execution thread after `lastExecution.toRdd`, and either 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next 
`runContinuous` will get interrupted unintentionally.

To handle this case, this PR has the following changes:

- Clean up the resources in `queryExecutionThread.runUninterruptibly`. This 
may increase the waiting time of `stop` but should be minor because the 
operations here are very fast (just sending an RPC message in the same process 
and stopping a very simple thread).
- Clear the interrupted status at the end so that it won't impact the 
`runContinuous` call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query termination because `runActivatedStream` will check 
`state` and exit accordingly.

I also updated the clean up codes to make sure exceptions thrown from 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the 
clean up.

Jenkins

Closes #24034 from zsxwing/SPARK-27111.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/continuous/ContinuousExecution.scala | 32 --
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 62adedb..dad7f9b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -270,14 +270,30 @@ class ContinuousExecution(
 logInfo(s"Query $id ignoring exception from reconfiguring: $t")
 // interrupted by reconfiguration - swallow exception so we can 
restart the query
 } finally {
-  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
-  SparkEnv.get.rpcEnv.stop(epochEndpoint)
-
-  epochUpdateThread.interrupt()
-  epochUpdateThread.join()
-
-  stopSources()
-  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+  // The above execution may finish before getting interrupted, for 
example, a Spark job having
+  // 0 partitions will complete immediately. Then the interrupted status 
will sneak here.
+  //
+  // To handle this case, we do the two things here:
+  //
+  // 1. Clean up the resources in 
`queryExecutionThread.runUninterruptibly`. This may increase
+  //the waiting time of `stop` but should be minor because the 
operations here are very fast
+  //(just sending an RPC message in the same process and stopping a 
very simple thread).
+  // 2. Clear the interrupted status at the end so that it won't impact 
the `runContinuous`
+  //call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query
+  //termination because `runActivatedStream` will check `state` and 
exit accordingly.
+  queryExecutionThread.runUninterruptibly {
+try {
+  epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
+} finally {
+  SparkEnv.get.rpcEnv.stop(epochEndpoint)
+  epochUpdateThread.interrupt()
+  epochUpdateThread.join()
+  stopSources()
+  // The following line must be the last line because it may fail if 
SparkContext is stopped
+  sparkSession.sparkContext.cancelJobGroup(runId.toString)
+}
+  }
+  Thread.interrupted()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted

2019-03-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 78d546f  [SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted
78d546f is described below

commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 11:26:53 2019 -0700

[SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted

## What changes were proposed in this pull request?

This patch proposes ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task aborts. Please note that this works as 
'best-effort', not kind of guarantee, as we have in 
HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24154 from HeartSaVioR/SPARK-27210.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/ManifestFileCommitProtocol.scala |  7 --
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 29 ++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 92191c8..916bd2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
   }
 
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
-// Do nothing
-// TODO: we can also try delete the addedFiles as a best-effort cleanup.
+// best effort cleanup of incomplete files
+if (addedFiles.nonEmpty) {
+  val fs = new 
Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
+  addedFiles.foreach { file => fs.delete(new Path(file), false) }
+}
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 619d118..020ab23 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.nio.file.Files
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest {
   checkDatasetUnorderly(outputDf, 1, 2, 3)
 }
   }
+
+  testQuietly("cleanup incomplete output for aborted task") {
+withTempDir { tempDir =>
+  val checkpointDir = new File(tempDir, "chk")
+  val outputDir = new File(tempDir, "output")
+  val inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3)
+  val q = inputData.toDS().map(_ / 0)
+.writeStream
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.format("parquet")
+.start(outputDir.getCanonicalPath)
+
+  intercept[StreamingQueryException] {
+try {
+  q.processAllAvailable()
+} finally {
+  q.stop()
+}
+  }
+
+  val outputFiles = Files.walk(outputDir.toPath).iterator().asScala
+.filter(_.toString.endsWith(".parquet"))
+  assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned 
up.")
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] branch asf-site updated: Add Jose Torres to committers list

2019-01-28 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new fb1a7b4  Add Jose Torres to committers list
fb1a7b4 is described below

commit fb1a7b407e149e133e35bb506d48cfe034a4d351
Author: Jose Torres 
AuthorDate: Mon Jan 28 15:59:37 2019 -0800

Add Jose Torres to committers list

Author: Jose Torres 

Closes #176 from jose-torres/addjose.
---
 committers.md| 1 +
 site/committers.html | 4 
 2 files changed, 5 insertions(+)

diff --git a/committers.md b/committers.md
index c3daf10..8049106 100644
--- a/committers.md
+++ b/committers.md
@@ -65,6 +65,7 @@ navigation:
 |Saisai Shao|Tencent|
 |Prashant Sharma|IBM|
 |Ram Sriharsha|Databricks|
+|Jose Torres|Databricks|
 |DB Tsai|Apple|
 |Takuya Ueshin|Databricks|
 |Marcelo Vanzin|Cloudera|
diff --git a/site/committers.html b/site/committers.html
index ec5814b..3066b5d 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -431,6 +431,10 @@
   Databricks
 
 
+  Jose Torres
+  Databricks
+
+
   DB Tsai
   Apple
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.3 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new a5d22da  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
a5d22da is described below

commit a5d22da1888b8110b490d52d2c36b3fc907254f6
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

    ## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index d6bef9c..a51f086 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge no

[spark] branch branch-2.2 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
 new 7c7d7f6  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
7c7d7f6 is described below

commit 7c7d7f6a878b02ece881266ee538f3e1443aa8c1
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

    ## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 55e7508..4069633 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4f19fa0..14a193f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge no

[spark] branch master updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 03a928c  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
03a928c is described below

commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

    ## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c696204..b79770a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge non-zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats(max = 10, min = 1

[spark] branch branch-2.4 updated: [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

2019-02-01 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new bd4ce51  [SPARK-26806][SS] EventTimeStats.merge should handle zeros 
correctly
bd4ce51 is described below

commit bd4ce51e699da306bc36db0c7b0303b6e6c3d4df
Author: Shixiong Zhu 
AuthorDate: Fri Feb 1 11:15:05 2019 -0800

[SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly

## What changes were proposed in this pull request?

Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` 
correctly. This will make `avg` become `NaN`. And whatever gets merged with the 
result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call 
`NaN.toLong` and get `0`, and the user will see the following incorrect report:

```
"eventTime" : {
"avg" : "1970-01-01T00:00:00.000Z",
"max" : "2019-01-31T12:57:00.000Z",
"min" : "2019-01-30T18:44:04.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
  }
```

This issue was reported by liancheng .

This PR fixes the above issue.

    ## How was this patch tested?

The new unit tests.

Closes #23718 from zsxwing/merge-zero.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd)
Signed-off-by: Shixiong Zhu 
---
 .../streaming/EventTimeWatermarkExec.scala | 17 +---
 .../sql/streaming/EventTimeWatermarkSuite.scala| 32 --
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index b161651..6fa7ee0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var 
avg: Double, var cou
   }
 
   def merge(that: EventTimeStats): Unit = {
-this.max = math.max(this.max, that.max)
-this.min = math.min(this.min, that.min)
-this.count += that.count
-this.avg += (that.avg - this.avg) * that.count / this.count
+if (that.count == 0) {
+  // no-op
+} else if (this.count == 0) {
+  this.max = that.max
+  this.min = that.min
+  this.count = that.count
+  this.avg = that.avg
+} else {
+  this.max = math.max(this.max, that.max)
+  this.min = math.min(this.min, that.min)
+  this.count += that.count
+  this.avg += (that.avg - this.avg) * that.count / this.count
+}
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 026af17..091b9a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("EventTimeStats") {
-val epsilon = 10E-6
+  private val epsilon = 10E-6
 
+  test("EventTimeStats") {
 val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
 stats.add(80L)
 stats.max should be (100)
@@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
   }
 
   test("EventTimeStats: avg on large values") {
-val epsilon = 10E-6
 val largeValue = 100L // 10B
 // Make sure `largeValue` will cause overflow if we use a Long sum to calc 
avg.
 assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
@@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 stats.avg should be ((largeValue + 0.5) +- epsilon)
   }
 
+  test("EventTimeStats: zero merge zero") {
+val stats = EventTimeStats.zero
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats should be (EventTimeStats.zero)
+  }
+
+  test("EventTimeStats: non-zero merge zero") {
+val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
+val stats2 = EventTimeStats.zero
+stats.merge(stats2)
+stats.max should be (10L)
+stats.min should be (1L)
+stats.avg should be (5.0 +- epsilon)
+stats.count should be (3L)
+  }
+
+  test("EventTimeStats: zero merge no

[spark] branch branch-2.4 updated: [SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving SparkListenerExecutorMetricsUpdate (backport 2.4)

2019-04-10 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new a8a2ba1  [SPARK-27394][WEBUI] Flush LiveEntity if necessary when 
receiving SparkListenerExecutorMetricsUpdate (backport 2.4)
a8a2ba1 is described below

commit a8a2ba11ac10051423e58920062b50f328b06421
Author: Shixiong Zhu 
AuthorDate: Wed Apr 10 15:17:04 2019 -0700

[SPARK-27394][WEBUI] Flush LiveEntity if necessary when receiving 
SparkListenerExecutorMetricsUpdate (backport 2.4)

## What changes were proposed in this pull request?

This PR backports #24303 to 2.4.

## How was this patch tested?

Jenkins

Closes #24328 from zsxwing/SPARK-27394-2.4.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../apache/spark/status/AppStatusListener.scala| 40 --
 .../scala/org/apache/spark/status/config.scala |  6 
 .../org/apache/spark/ui/UISeleniumSuite.scala  | 35 +--
 docs/configuration.md  |  8 +
 4 files changed, 75 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c4dd47d..cb7ab7f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -58,6 +58,12 @@ private[spark] class AppStatusListener(
   // operations that we can live without when rapidly processing incoming task 
events.
   private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
 
+  /**
+   * Minimum time elapsed before stale UI data is flushed. This avoids UI 
staleness when incoming
+   * task events are not fired frequently.
+   */
+  private val liveUpdateMinFlushPeriod = 
conf.get(LIVE_ENTITY_UPDATE_MIN_FLUSH_PERIOD)
+
   private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
   private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
 
@@ -73,6 +79,9 @@ private[spark] class AppStatusListener(
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
 
+  /** The last time when flushing `LiveEntity`s. This is to avoid flushing too 
frequently. */
+  private var lastFlushTimeNs = System.nanoTime()
+
   kvstore.addTrigger(classOf[ExecutorSummaryWrapper], 
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
 { count => cleanupExecutors(count) }
 
@@ -86,7 +95,8 @@ private[spark] class AppStatusListener(
 
   kvstore.onFlush {
 if (!live) {
-  flush()
+  val now = System.nanoTime()
+  flush(update(_, now))
 }
   }
 
@@ -744,6 +754,15 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// Flush updates if necessary. Executor heartbeat is an event that happens 
periodically. Flush
+// here to ensure the staleness of Spark UI doesn't last more than
+// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
+if (now - lastFlushTimeNs > liveUpdateMinFlushPeriod) {
+  flush(maybeUpdate(_, now))
+  // Re-get the current system time because `flush` may be slow and `now` 
is stale.
+  lastFlushTimeNs = System.nanoTime()
+}
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
@@ -755,18 +774,17 @@ private[spark] class AppStatusListener(
 }
   }
 
-  /** Flush all live entities' data to the underlying store. */
-  private def flush(): Unit = {
-val now = System.nanoTime()
+  /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush 
them. */
+  private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
 liveStages.values.asScala.foreach { stage =>
-  update(stage, now)
-  stage.executorSummaries.values.foreach(update(_, now))
+  entityFlushFunc(stage)
+  stage.executorSummaries.values.foreach(entityFlushFunc)
 }
-liveJobs.values.foreach(update(_, now))
-liveExecutors.values.foreach(update(_, now))
-liveTasks.values.foreach(update(_, now))
-liveRDDs.values.foreach(update(_, now))
-pools.values.foreach(update(_, now))
+liveJobs.values.foreach(entityFlushFunc)
+liveExecutors.values.foreach(entityFlushFunc)
+liveTasks.values.foreach(entityFlushFunc)
+liveRDDs.values.foreach(entityFlushFunc)
+pools.values.foreach(entityFlushFunc)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/status/config.scala 
b/core/src/main/scala/org/apache/spark/status/config.scala
index 67801b8..87204fd 100644
--- a/core/src/main/scala/org/apache/spark/status/config.scala
+++ b/core/src/main/scala/org/apache/spark/status/config.scala
@@ -31,6 +31,12 @@ private[spark] object config {
 .timeConf(TimeUnit.NANOSECONDS)
 .createWithDefaultString("

[spark] branch master updated: [SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in ExecutorClassLoader

2019-05-28 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 04f142d  [SPARK-20547][REPL] Throw RemoteClassLoadedError for 
transient errors in ExecutorClassLoader
04f142d is described below

commit 04f142db9c4f87699053eb3aa777c08aca57b524
Author: Shixiong Zhu 
AuthorDate: Tue May 28 12:56:14 2019 -0700

[SPARK-20547][REPL] Throw RemoteClassLoadedError for transient errors in 
ExecutorClassLoader

## What changes were proposed in this pull request?

`ExecutorClassLoader`'s `findClass` may fail to fetch a class due to 
transient exceptions. For example, when a task is interrupted, if 
`ExecutorClassLoader` is fetching a class, you may see `InterruptedException` 
or `IOException` wrapped by `ClassNotFoundException`, even if this class can be 
loaded. Then the result of `findClass` will be cached by JVM, and later when 
the same class is being loaded in the same executor, it will just throw 
NoClassDefFoundError even if the class can be loaded.

I found JVM only caches `LinkageError` and `ClassNotFoundException`. Hence 
in this PR, I changed ExecutorClassLoader to throw `RemoteClassLoadedError` if 
we cannot get a response from driver.

## How was this patch tested?

New unit tests.

Closes #24683 from zsxwing/SPARK-20547-fix.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../network/server/TransportRequestHandler.java|   2 +
 .../apache/spark/repl/ExecutorClassLoader.scala|  45 ++-
 .../spark/repl/ExecutorClassLoaderSuite.scala  | 145 -
 .../scala/org/apache/spark/repl/ReplSuite.scala|  17 ++-
 .../org/apache/spark/repl/SingletonReplSuite.scala |  16 +++
 5 files changed, 214 insertions(+), 11 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 3e089b4..0792b58 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -140,6 +140,8 @@ public class TransportRequestHandler extends 
MessageHandler {
 streamManager.streamSent(req.streamId);
   });
 } else {
+  // org.apache.spark.repl.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX 
should also be updated
+  // when the following error message is changed.
   respond(new StreamFailure(req.streamId, String.format(
 "Stream '%s' was not found.", req.streamId)));
 }
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 177bce2..0cfd961 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -21,6 +21,8 @@ import java.io.{ByteArrayOutputStream, FileNotFoundException, 
FilterInputStream,
 import java.net.{URI, URL, URLEncoder}
 import java.nio.channels.Channels
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.xbean.asm7._
 import org.apache.xbean.asm7.Opcodes._
@@ -106,7 +108,17 @@ class ExecutorClassLoader(
 parentLoader.loadClass(name)
   } catch {
 case e: ClassNotFoundException =>
-  val classOption = findClassLocally(name)
+  val classOption = try {
+findClassLocally(name)
+  } catch {
+case e: RemoteClassLoaderError =>
+  throw e
+case NonFatal(e) =>
+  // Wrap the error to include the class name
+  // scalastyle:off throwerror
+  throw new RemoteClassLoaderError(name, e)
+  // scalastyle:on throwerror
+  }
   classOption match {
 case None => throw new ClassNotFoundException(name, e)
 case Some(a) => a
@@ -115,14 +127,15 @@ class ExecutorClassLoader(
 }
   }
 
+  // See 
org.apache.spark.network.server.TransportRequestHandler.processStreamRequest.
+  private val STREAM_NOT_FOUND_REGEX = s"Stream '.*' was not found.".r.pattern
+
   private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = 
{
-val channel = env.rpcEnv.openChannel(s"$classUri/$path")
+val channel = env.rpcEnv.openChannel(s"$classUri/${urlEncode(path)}")
 new FilterInputStream(Channels.newInputStream(channel)) {
 
   override def read(): Int = toClassNotFound(super.read())
 
-  override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b))
-
   override def read(b: Array[B

[spark] branch master updated: [SPARK-28574][CORE] Allow to config different sizes for event queues

2019-08-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c212c9d  [SPARK-28574][CORE] Allow to config different sizes for event 
queues
c212c9d is described below

commit c212c9d9ed7375cd1ea16c118733edd84037ec0d
Author: yunzoud 
AuthorDate: Fri Aug 2 15:27:33 2019 -0700

[SPARK-28574][CORE] Allow to config different sizes for event queues

## What changes were proposed in this pull request?
Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity 
to allow configuration of different event queue size.

## How was this patch tested?
Unit test in 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Closes #25307 from yunzoud/SPARK-28574.

Authored-by: yunzoud 
Signed-off-by: Shixiong Zhu 
---
 .../apache/spark/scheduler/AsyncEventQueue.scala   | 14 +--
 .../apache/spark/scheduler/LiveListenerBus.scala   |  4 
 .../spark/scheduler/SparkListenerSuite.scala   | 28 ++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
index 7cd2b86..11e2c47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -46,8 +46,18 @@ private class AsyncEventQueue(
 
   // Cap the capacity of the queue so we get an explicit error (rather than an 
OOM exception) if
   // it's perpetually being added to more quickly than it's being drained.
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
-conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+  // The capacity can be configured by 
spark.scheduler.listenerbus.eventqueue.${name}.capacity,
+  // if no such conf is specified, use the value specified in
+  // LISTENER_BUS_EVENT_QUEUE_CAPACITY
+  private[scheduler] def capacity: Int = {
+val queuesize = 
conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity",
+conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+assert(queuesize > 0, s"capacity for event queue $name must be greater 
than 0, " +
+  s"but $queuesize is configured.")
+queuesize
+  }
+
+  private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](capacity)
 
   // Keep the event count separately, so that waitUntilEmpty() can be 
implemented properly;
   // this allows that method to return only when the events in the queue have 
been fully
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index d135190..302ebd3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -236,6 +236,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
 queues.asScala.map(_.name).toSet
   }
 
+  // For testing only.
+  private[scheduler] def getQueueCapacity(name: String): Option[Int] = {
+queues.asScala.find(_.name == name).map(_.capacity)
+  }
 }
 
 private[spark] object LiveListenerBus {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index a7869d3..8903e10 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -532,6 +532,34 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 }
   }
 
+  test("event queue size can be configued through spark conf") {
+// configure the shared queue size to be 1, event log queue size to be 2,
+// and listner bus event queue size to be 5
+val conf = new SparkConf(false)
+  .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
+  .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", 
"1")
+  
.set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2")
+
+val bus = new LiveListenerBus(conf)
+val counter1 = new BasicJobCounter()
+val counter2 = new BasicJobCounter()
+val counter3 = new BasicJobCounter()
+
+// add a new shared, status and event queue
+bus.addToSharedQueue(counter1)
+bus.addToStatusQueue(counter2)
+bus.addToEventLogQueue(counter3)
+
+assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, 
EVENT_LOG_QUEUE))
+// check the size of shared queue is 1 as configured
+assert(bus.getQueueCapacity(SHARED_QUEUE) == So

[spark] branch master updated: [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter

2019-08-20 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b37c8d5  [SPARK-28650][SS][DOC] Correct explanation of guarantee for 
ForeachWriter
b37c8d5 is described below

commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Aug 20 00:56:53 2019 -0700

[SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter

#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it 
doesn't guarantee same output for `(partitionId, epochId)`. Refer the 
description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) 
for more details.

Spark itself still guarantees same output for same epochId (batch) if the 
preconditions are met, 1) source is always providing the same input records for 
same offset request. 2) the query is idempotent in overall (indeterministic 
calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions 
are implicitly required even before), we still can describe the guarantee with 
`epochId`, though it will be  harder to leverage the guarantee: 1) 
ForeachWriter should implement a feature to track whether all the partitions 
are written successfully for given `epochId` 2) There's pretty less chance to 
leverage the fact, as the chance for Spark to successfully write all partitions 
and fail to checkpoint the batch i [...]

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes #25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 docs/structured-streaming-programming-guide.md | 14 ++
 .../main/scala/org/apache/spark/sql/ForeachWriter.scala| 13 +
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index e07a0e5..b0d3e16 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1843,7 +1843,7 @@ Here are the details of all the sinks in Spark.
 Foreach Sink
 Append, Update, Complete
 None
-Depends on ForeachWriter implementation
+Yes (at-least-once)
 More details in the next 
section
   
   
@@ -2251,13 +2251,11 @@ When the streaming query is started, Spark calls the 
function or the object’s
 
 - The close() method (if it exists) is called if an open() method exists and 
returns successfully (irrespective of the return value), except if the JVM or 
Python process crashes in the middle.
 
-- **Note:** The partitionId and epochId in the open() method can be used to 
deduplicate generated data 
-  when failures cause reprocessing of some input data. This depends on the 
execution mode of the query. 
-  If the streaming query is being executed in the micro-batch mode, then every 
partition represented 
-  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same 
data. 
-  Hence, (partition_id, epoch_id) can be used to deduplicate and/or 
transactionally commit 
-  data and achieve exactly-once guarantees. However, if the streaming query is 
being executed 
-  in the continuous mode, then this guarantee does not hold and therefore 
should not be used for deduplication.
+- **Note:** Spark does not guarantee same output for (partitionId, epochId), 
so deduplication
+  cannot be achieved with (partitionId, epochId). e.g. source provides 
different number of
+  partitions for some reasons, Spark optimization changes number of 
partitions, etc.
+  See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for 
more details.
+  If you need deduplication on output, try out `foreachBatch` instead.
 
  Triggers
 The trigger settings of a streaming query define the timing of streaming data 
processing, whether
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index 5c0fe79..a0b0a34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -50,14 +50,11 @@ import org.apache.spark.annotation.Evolving
  *
  * Important points to note:
  * 
- * The `partitionId` and `epochId` can be used to deduplicate generated 
data when failures
- * cause reprocessing of some input data. This depends on the execution 
mode of the query. If
- * the streaming query is being executed in the micro-batch mode, then 
every partition

[spark] branch branch-2.4 updated: [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter

2019-08-20 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new aff5e2b  [SPARK-28650][SS][DOC] Correct explanation of guarantee for 
ForeachWriter
aff5e2b is described below

commit aff5e2bdca501fc24fb7d56f966d933c96a37b5b
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Aug 20 00:56:53 2019 -0700

[SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter

#  What changes were proposed in this pull request?

This patch modifies the explanation of guarantee for ForeachWriter as it 
doesn't guarantee same output for `(partitionId, epochId)`. Refer the 
description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) 
for more details.

Spark itself still guarantees same output for same epochId (batch) if the 
preconditions are met, 1) source is always providing the same input records for 
same offset request. 2) the query is idempotent in overall (indeterministic 
calculation like now(), random() can break this).

Assuming breaking preconditions as an exceptional case (the preconditions 
are implicitly required even before), we still can describe the guarantee with 
`epochId`, though it will be  harder to leverage the guarantee: 1) 
ForeachWriter should implement a feature to track whether all the partitions 
are written successfully for given `epochId` 2) There's pretty less chance to 
leverage the fact, as the chance for Spark to successfully write all partitions 
and fail to checkpoint the batch i [...]

Credit to zsxwing on discovering the broken guarantee.

## How was this patch tested?

This is just a documentation change, both on javadoc and guide doc.

Closes #25407 from HeartSaVioR/SPARK-28650.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30)
Signed-off-by: Shixiong Zhu 
---
 docs/structured-streaming-programming-guide.md | 14 ++
 .../main/scala/org/apache/spark/sql/ForeachWriter.scala| 13 +
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 2c3348a..fa5664d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1827,7 +1827,7 @@ Here are the details of all the sinks in Spark.
 Foreach Sink
 Append, Update, Complete
 None
-Depends on ForeachWriter implementation
+Yes (at-least-once)
 More details in the next 
section
   
   
@@ -2235,13 +2235,11 @@ When the streaming query is started, Spark calls the 
function or the object’s
 
 - The close() method (if it exists) is called if an open() method exists and 
returns successfully (irrespective of the return value), except if the JVM or 
Python process crashes in the middle.
 
-- **Note:** The partitionId and epochId in the open() method can be used to 
deduplicate generated data 
-  when failures cause reprocessing of some input data. This depends on the 
execution mode of the query. 
-  If the streaming query is being executed in the micro-batch mode, then every 
partition represented 
-  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same 
data. 
-  Hence, (partition_id, epoch_id) can be used to deduplicate and/or 
transactionally commit 
-  data and achieve exactly-once guarantees. However, if the streaming query is 
being executed 
-  in the continuous mode, then this guarantee does not hold and therefore 
should not be used for deduplication.
+- **Note:** Spark does not guarantee same output for (partitionId, epochId), 
so deduplication
+  cannot be achieved with (partitionId, epochId). e.g. source provides 
different number of
+  partitions for some reasons, Spark optimization changes number of 
partitions, etc.
+  See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for 
more details.
+  If you need deduplication on output, try out `foreachBatch` instead.
 
  Triggers
 The trigger settings of a streaming query defines the timing of streaming data 
processing, whether
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index 52b8c83..5cf294e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -50,14 +50,11 @@ import org.apache.spark.annotation.InterfaceStability
  *
  * Important points to note:
  * 
- * The `partitionId` and `epochId` can be used to deduplicate generated 
data when failures
- * cause reprocessing of some input data. This depends on the execution 
mode

[spark] branch master updated: [SPARK-3137][CORE] Replace the global TorrentBroadcast lock with fine grained KeyLock

2019-09-03 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8980093  [SPARK-3137][CORE] Replace the global TorrentBroadcast lock 
with fine grained KeyLock
8980093 is described below

commit 89800931aa8b565335e45e1d26ff60402e46c534
Author: Shixiong Zhu 
AuthorDate: Tue Sep 3 14:09:07 2019 -0700

[SPARK-3137][CORE] Replace the global TorrentBroadcast lock with fine 
grained KeyLock

### What changes were proposed in this pull request?

This PR provides a new lock mechanism `KeyLock` to lock  with a given key. 
Also use this new lock in `TorrentBroadcast` to avoid blocking tasks from 
fetching different broadcast values.

### Why are the changes needed?

`TorrentBroadcast.readObject` uses a global lock so only one task can be 
fetching the blocks at the same time. This is not optimal if we are running 
multiple stages concurrently because they should be able to independently fetch 
their own blocks.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #25612 from zsxwing/SPARK-3137.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../apache/spark/broadcast/BroadcastManager.scala  |   9 +-
 .../apache/spark/broadcast/TorrentBroadcast.scala  |  20 ++--
 .../main/scala/org/apache/spark/util/KeyLock.scala |  69 
 .../scala/org/apache/spark/util/KeyLockSuite.scala | 118 +
 4 files changed, 207 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala 
b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index ed45043..9fa4745 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.broadcast
 
+import java.util.Collections
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.reflect.ClassTag
@@ -55,9 +56,11 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
-  private[broadcast] val cachedValues = {
-new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
-  }
+  private[broadcast] val cachedValues =
+Collections.synchronizedMap(
+  new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
+.asInstanceOf[java.util.Map[Any, Any]]
+)
 
   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
 val bid = nextBroadcastId.getAndIncrement()
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index f416be8..1379314 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{KeyLock, Utils}
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
@@ -167,7 +167,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
   bm.getLocalBytes(pieceId) match {
 case Some(block) =>
   blocks(pid) = block
-  releaseLock(pieceId)
+  releaseBlockManagerLock(pieceId)
 case None =>
   bm.getRemoteBytes(pieceId) match {
 case Some(b) =>
@@ -215,8 +215,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
   }
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
-val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
-broadcastCache.synchronized {
+TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {
+  // As we only lock based on `broadcastId`, whenever using 
`broadcastCache`, we should only
+  // touch `broadcastId`.
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
 
   Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse 
{
 setConf(SparkEnv.get.conf)
@@ -225,7 +227,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
   case Some(blockResult) =>
 if (blockResult.data.hasNext) {
   val x = blockResult.data.next().asInstanceOf[T]
-  releaseLock(broadcastId)
+  releaseBlockManagerLock(broadcastId)
 
   if (x != null) {
 broadcastCache.put(broadcastId, x)
@@ -270,7 +272,7 @@ private[

[spark] branch master updated: [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files

2019-08-23 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 406c533  [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager 
leaking crc files
406c533 is described below

commit 406c5331ff8937120af465219c8f443ee00a97fb
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Thu Aug 22 23:10:16 2019 -0700

[SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc 
files

### What changes were proposed in this pull request?

This PR fixes the leak of crc files from CheckpointFileManager when 
FileContextBasedCheckpointFileManager is being used.

Spark hits the Hadoop bug, 
[HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems 
to be a long-standing issue.

This is there're two `renameInternal` methods:

```
public void renameInternal(Path src, Path dst)
public void renameInternal(final Path src, final Path dst, boolean 
overwrite)
```

which should be overridden to handle all cases but ChecksumFs only 
overrides method with 2 params, so when latter is called 
FilterFs.renameInternal(...) is called instead, and it will do rename with 
RawLocalFs as underlying filesystem.

The bug is related to FileContext, so FileSystemBasedCheckpointFileManager 
is not affected.

[SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a 
workaround for this bug, but 
[SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to 
bring regression.

This PR deletes crc file as "best-effort" when renaming, as failing to 
delete crc file is not that critical to fail the task.

### Why are the changes needed?

This PR prevents crc files not being cleaned up even purging batches. Too 
many files in same directory often hurts performance, as well as each crc file 
occupies more space than its own size so possible to occupy nontrivial amount 
of space when batches go up to 10+.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Some unit tests are modified to check leakage of crc files.

Closes #25488 from HeartSaVioR/SPARK-28025.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/CheckpointFileManager.scala  | 14 ++
 .../streaming/CheckpointFileManagerSuite.scala | 16 
 .../execution/streaming/HDFSMetadataLogSuite.scala | 30 ++
 3 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index fe6362d..26f42b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -327,6 +327,8 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
   override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
 import Options.Rename._
 fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+// TODO: this is a workaround of HADOOP-16255 - remove this when 
HADOOP-16255 is resolved
+mayRemoveCrcFile(srcPath)
   }
 
 
@@ -343,5 +345,17 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
 case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + 
ChecksumFs
 case _ => false
   }
+
+  private def mayRemoveCrcFile(path: Path): Unit = {
+try {
+  val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
+  if (exists(checksumFile)) {
+// checksum file exists, deleting it
+delete(checksumFile)
+  }
+} catch {
+  case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
+}
+  }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index c57b40c..79bcd49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends 
SparkFunSuite with SQLHelper {
   assert(fm.exists(path))
   fm.createAtomic(path, overwriteIfPossible = true).close()  // should not 
throw exception
 
+  // crc file should not be leaked when origin file does

[spark] branch master updated: [SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses

2019-09-05 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 84a4d3a  [SPARK-28976][CORE] Use KeyLock to simplify 
MapOutputTracker.getStatuses
84a4d3a is described below

commit 84a4d3a17ccbf7e0cb75dffbbdc20a26715f7323
Author: Shixiong Zhu 
AuthorDate: Wed Sep 4 23:20:27 2019 -0700

[SPARK-28976][CORE] Use KeyLock to simplify MapOutputTracker.getStatuses

### What changes were proposed in this pull request?

Use `KeyLock` added in #25612 to simplify `MapOutputTracker.getStatuses`. 
It also has some improvement after the refactoring:
- `InterruptedException` is no longer sallowed.
- When a shuffle block is fetched, we don't need to wake up unrelated 
sleeping threads.

### Why are the changes needed?

`MapOutputTracker.getStatuses` is pretty hard to maintain right now because 
it has a special lock mechanism which we needs to pay attention to whenever 
updating this method. As we can use `KeyLock` to hide the complexity of locking 
behind a dedicated lock class, it's better to refactor it to make it easy to 
understand and maintain.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #25680 from zsxwing/getStatuses.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 50 +-
 1 file changed, 10 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 5c820f5..d878fc5 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -678,8 +678,11 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
   val mapStatuses: Map[Int, Array[MapStatus]] =
 new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
 
-  /** Remembers which map output locations are currently being fetched on an 
executor. */
-  private val fetching = new HashSet[Int]
+  /**
+   * A [[KeyLock]] whose key is a shuffle id to ensure there is only one 
thread fetching
+   * the same shuffle block.
+   */
+  private val fetchingLock = new KeyLock[Int]
 
   // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded 
in the result.
   override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
@@ -707,51 +710,18 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 if (statuses == null) {
   logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
   val startTimeNs = System.nanoTime()
-  var fetchedStatuses: Array[MapStatus] = null
-  fetching.synchronized {
-// Someone else is fetching it; wait for them to be done
-while (fetching.contains(shuffleId)) {
-  try {
-fetching.wait()
-  } catch {
-case e: InterruptedException =>
-  }
-}
-
-// Either while we waited the fetch happened successfully, or
-// someone fetched it in between the get and the fetching.synchronized.
-fetchedStatuses = mapStatuses.get(shuffleId).orNull
+  fetchingLock.withLock(shuffleId) {
+var fetchedStatuses = mapStatuses.get(shuffleId).orNull
 if (fetchedStatuses == null) {
-  // We have to do the fetch, get others to wait for us.
-  fetching += shuffleId
-}
-  }
-
-  if (fetchedStatuses == null) {
-// We won the race to fetch the statuses; do so
-logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
-// This try-finally prevents hangs due to timeouts:
-try {
+  logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
   val fetchedBytes = 
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
   fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes)
   logInfo("Got the output locations")
   mapStatuses.put(shuffleId, fetchedStatuses)
-} finally {
-  fetching.synchronized {
-fetching -= shuffleId
-fetching.notifyAll()
-  }
 }
-  }
-  logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
-s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} 
ms")
-
-  if (fetchedStatuses != null) {
+logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+  s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} 
ms&qu

[spark] branch master updated: [SPARK-27254][SS] Cleanup complete but invalid output files in ManifestFileCommitProtocol if job is aborted

2019-09-27 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d72f398  [SPARK-27254][SS] Cleanup complete but invalid output files 
in ManifestFileCommitProtocol if job is aborted
d72f398 is described below

commit d72f39897b00d0bbd7a4db9de281a1256fcf908d
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Sep 27 12:35:26 2019 -0700

[SPARK-27254][SS] Cleanup complete but invalid output files in 
ManifestFileCommitProtocol if job is aborted

## What changes were proposed in this pull request?

SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task is aborted.

This patch extends the area of cleaning up, proposes 
ManifestFileCommitProtocol to clean up complete but invalid output files in job 
level if job aborts. Please note that this works as 'best-effort', not kind of 
guarantee, as we have in HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24186 from HeartSaVioR/SPARK-27254.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) 
Co-authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/ManifestFileCommitProtocol.scala | 37 ++-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 74 ++
 2 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 916bd2d..f6cc811 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
@@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
   @transient private var fileLog: FileStreamSinkLog = _
   private var batchId: Long = _
 
+  @transient private var pendingCommitFiles: ArrayBuffer[Path] = _
+
   /**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
@@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def setupJob(jobContext: JobContext): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
-// Do nothing
+pendingCommitFiles = new ArrayBuffer[Path]
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
 val fileStatuses = 
taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
 
+// We shouldn't remove the files if they're written to the metadata:
+// `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to 
the metadata
+// as well as there could be race
+// so for the safety we clean up the list before calling anything incurs 
exception.
+// The case is uncommon and we do best effort instead of guarantee, so the 
simplicity of
+// logic here would be OK, and safe for dealing with unexpected situations.
+pendingCommitFiles.clear()
+
 if (fileLog.add(batchId, fileStatuses)) {
   logInfo(s"Committed batch $batchId")
 } else {
@@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
 
   override def abortJob(jobContext: JobContext): Unit = {
 require(fileLog != null, "setupManifestOptions must be called before this 
function")
-// Do nothing
+// Best effort cleanup of complete files from failed job.
+// Since the file has UUID in its filename, we are safe to try deleting 
them
+// as the file will not conflict with file with another attempt on the 
same task.
+if (pendingCommitFiles.nonEmpty) {
+  pendingCommitFiles.foreach { path =>
+try {
+  val fs = path.getFileSystem(jobContext.getConfiguration)
+  // this is to make sure the file can be seen from driver as well
+  if (fs.exists(path)) {
+fs.delete(path, false)
+  }
+} catch {
+  case e: IOException =>
+logWarning(s"Fail to remove temporary file $path, continue 
removing next.", e)
+}
+  }
+  pendingCommitFiles.clear()
+}
+  }
+
+  override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
+pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
+  .map(

[spark] branch master updated: Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer"

2019-12-10 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cfd7ca9  Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka 
producer"
cfd7ca9 is described below

commit cfd7ca9a06161f7622b5179a777f965c11892afa
Author: Shixiong Zhu 
AuthorDate: Tue Dec 10 11:21:46 2019 -0800

Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer"

This reverts commit 3641c3dd69b2bd2beae028d52356450cc41f69ed.
---
 .../spark/sql/kafka010/CachedKafkaProducer.scala   | 118 +++-
 .../sql/kafka010/InternalKafkaConnectorPool.scala  | 210 -
 .../sql/kafka010/InternalKafkaConsumerPool.scala   | 210 ++---
 .../sql/kafka010/InternalKafkaProducerPool.scala   |  68 ---
 .../spark/sql/kafka010/KafkaDataConsumer.scala |   7 +-
 .../spark/sql/kafka010/KafkaDataWriter.scala   |  34 +---
 .../apache/spark/sql/kafka010/KafkaWriteTask.scala |  20 +-
 .../org/apache/spark/sql/kafka010/package.scala|  34 +---
 .../sql/kafka010/CachedKafkaProducerSuite.scala| 154 ---
 scala => InternalKafkaConsumerPoolSuite.scala} |   8 +-
 .../sql/kafka010/KafkaDataConsumerSuite.scala  |   6 +-
 .../org/apache/spark/sql/kafka010/KafkaTest.scala  |  10 +-
 .../kafka010/KafkaDataConsumerSuite.scala  |   7 +
 13 files changed, 332 insertions(+), 554 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index 907440a..fc177cd 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -18,68 +18,60 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.io.Closeable
+import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
 
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.kafka.clients.producer.{Callback, KafkaProducer, 
ProducerRecord}
-
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
-import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
-import org.apache.spark.util.ShutdownHookManager
 
-private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String, 
Object])
-  extends Closeable with Logging {
+private[kafka010] object CachedKafkaProducer extends Logging {
 
   private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
 
-  private val producer = createProducer()
+  private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
 
-  private def createProducer(): Producer = {
-val producer: Producer = new Producer(kafkaParams)
-if (log.isDebugEnabled()) {
-  val redactedParamsSeq = 
KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
-  logDebug(s"Created a new instance of kafka producer for 
$redactedParamsSeq.")
+  private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get)
+.map(_.conf.get(PRODUCER_CACHE_TIMEOUT))
+.getOrElse(defaultCacheExpireTimeout)
+
+  private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
+override def load(config: Seq[(String, Object)]): Producer = {
+  createKafkaProducer(config)
 }
-producer
   }
 
-  override def close(): Unit = {
-try {
-  if (log.isInfoEnabled()) {
-val redactedParamsSeq = 
KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
-logInfo(s"Closing the KafkaProducer with params: 
${redactedParamsSeq.mkString("\n")}.")
+  private val removalListener = new RemovalListener[Seq[(String, Object)], 
Producer]() {
+override def onRemoval(
+notification: RemovalNotification[Seq[(String, Object)], Producer]): 
Unit = {
+  val paramsSeq: Seq[(String, Object)] = notification.getKey
+  val producer: Producer = notification.getValue
+  if (log.isDebugEnabled()) {
+val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
+logDebug(s"Evicting kafka producer $producer params: 
$redactedParamsSeq, " +
+  s"due to ${notification.getCause}")
   }
-  producer.close()
-} catch {
-  case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
+  close(paramsSeq, producer)
 }
   }
 
-  def send(record: ProducerRecord[Arra

[spark] branch master updated: [SPARK-29953][SS] Don't clean up source files for FileStreamSource if the files belong to the output of FileStreamSink

2019-12-05 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 25431d7  [SPARK-29953][SS] Don't clean up source files for 
FileStreamSource if the files belong to the output of FileStreamSink
25431d7 is described below

commit 25431d79f7daf2a68298701154eb505c2a4add80
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Thu Dec 5 21:46:28 2019 -0800

[SPARK-29953][SS] Don't clean up source files for FileStreamSource if the 
files belong to the output of FileStreamSink

### What changes were proposed in this pull request?

This patch prevents the cleanup operation in FileStreamSource if the source 
files belong to the FileStreamSink. This is needed because the output of 
FileStreamSink can be read with multiple Spark queries and queries will read 
the files based on the metadata log, which won't reflect the cleanup.

To simplify the logic, the patch only takes care of the case of when the 
source path without glob pattern refers to the output directory of 
FileStreamSink, via checking FileStreamSource to see whether it leverages 
metadata directory or not to list the source files.

### Why are the changes needed?

Without this patch, if end users turn on cleanup option with the path which 
is the output of FileStreamSink, there may be out of sync between metadata and 
available files which may break other queries reading the path.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added UT.

Closes #26590 from HeartSaVioR/SPARK-29953.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 docs/structured-streaming-programming-guide.md |  2 +-
 .../sql/execution/streaming/FileStreamSource.scala | 17 -
 .../sql/streaming/FileStreamSourceSuite.scala  | 83 +-
 3 files changed, 81 insertions(+), 21 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 01679e5..b91b930 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
 When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater 
than 2). e.g. /archived/here. This will ensure archived files are 
never included as new source files.
 Spark will move source files respecting their own path. For example, 
if the path of source file is /a/b/dataset.txt and the path of 
archive directory is /archived/here, file will be moved to 
/archived/here/a/b/dataset.txt.
 NOTE: Both archiving (via moving) or deleting completed files will 
introduce overhead (slow down) in each micro-batch, so you need to understand 
the cost for each operation in your file system before enabling this option. On 
the other hand, enabling this option will reduce the cost to list source files 
which can be an expensive operation.
-NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option.
+NOTE 2: The source path should not be used from multiple sources or 
queries when enabling this option. Similarly, you must ensure the source path 
doesn't match to any files in output directory of file stream sink.
 NOTE 3: Both delete and move actions are best effort. Failing to 
delete or move files will not fail the streaming query.
 
 For file-format-specific options, see the related methods in 
DataStreamReader
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 35d486c..f31fb32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -206,6 +206,17 @@ class FileStreamSource(
   CaseInsensitiveMap(options), None).allFiles()
   }
 
+  private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue 
match {
+case Some(true) =>
+  if (sourceCleaner.isDefined) {
+throw new UnsupportedOperationException("Clean up source files is not 
supported when" +
+  " reading from the output directory of FileStreamSink.")
+  }
+  sourceHasMetadata = Some(true)
+case _ =>
+  sourceHasMetadata = newValue
+  }
+
   /**
* Returns a list of files found, sorted by their timestamp.
*/
@@ -216,7 +227,7 @@ class FileStreamSource(
 

[spark] branch branch-2.4 updated: Revert "[SPARK-29494][SQL] Fix for ArrayOutofBoundsException while converting string to timestamp"

2019-10-18 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new b094774  Revert "[SPARK-29494][SQL] Fix for ArrayOutofBoundsException 
while converting string to timestamp"
b094774 is described below

commit b09477415f8cbff5292066c438f65af15622
Author: Shixiong Zhu 
AuthorDate: Fri Oct 18 15:21:35 2019 -0700

Revert "[SPARK-29494][SQL] Fix for ArrayOutofBoundsException while 
converting string to timestamp"

This reverts commit 4d476ed44a36eadb0b21b88d0f6420d52a80cc9d.
---
 .../scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala| 2 +-
 .../org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala | 6 --
 2 files changed, 1 insertion(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 5deb83e..cc3fcb2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -378,7 +378,7 @@ object DateTimeUtils {
 i += 1
   }
 } else {
-  if (i < segments.length && (b == ':' || b == ' ')) {
+  if (b == ':' || b == ' ') {
 segments(i) = currentSegmentValue
 currentSegmentValue = 0
 i += 1
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 4496ec8..abdb916 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -581,12 +581,6 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 }
   }
 
-  test("trailing characters while converting string to timestamp") {
-val s = UTF8String.fromString("2019-10-31T10:59:23Z:::")
-val time = DateTimeUtils.stringToTimestamp(s, defaultZoneId)
-assert(time == None)
-  }
-
   test("truncTimestamp") {
 def testTrunc(
 level: Int,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30943][SS] Show "batch ID" in tool tip string for Structured Streaming UI graphs

2020-02-25 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 5343059  [SPARK-30943][SS] Show "batch ID" in tool tip string for 
Structured Streaming UI graphs
5343059 is described below

commit 53430594587ad0134eff5cd2b5e06a7a3eec1b99
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Feb 25 15:29:36 2020 -0800

[SPARK-30943][SS] Show "batch ID" in tool tip string for Structured 
Streaming UI graphs

### What changes were proposed in this pull request?

This patch changes the tool tip string in Structured Streaming UI graphs to 
show batch ID (and timestamp as well) instead of only showing timestamp, which 
was a key for DStream but no longer a key for Structured Streaming.

This patch does some refactoring as there're some spots on confusion 
between js file for streaming and structured streaming.

Note that this patch doesn't actually change the x axis, as once we change 
it we should decouple the logic for graphs between streaming and structured 
streaming. It won't change UX meaningfully as in x axis we only show min and 
max which we still would like to know about "time" as well as batch ID.

### Why are the changes needed?

In Structured Streaming, everything is aligned for "batch ID" where the UI 
is only showing timestamp - end users have to manually find and correlate batch 
ID and the timestamp which is clearly a huge pain.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Manually tested. Screenshots:

![Screen Shot 2020-02-25 at 7 22 38 
AM](https://user-images.githubusercontent.com/1317309/75197701-40b2ce80-57a2-11ea-9578-c2eb2d1091de.png)
![Screen Shot 2020-02-25 at 7 22 44 
AM](https://user-images.githubusercontent.com/1317309/75197704-427c9200-57a2-11ea-9439-e0a8303d0860.png)
![Screen Shot 2020-02-25 at 7 22 58 
AM](https://user-images.githubusercontent.com/1317309/75197706-43152880-57a2-11ea-9617-1276c3ba181e.png)
![Screen Shot 2020-02-25 at 7 23 04 
AM](https://user-images.githubusercontent.com/1317309/75197708-43152880-57a2-11ea-9de2-7d37eaf88102.png)
![Screen Shot 2020-02-25 at 7 23 31 
AM](https://user-images.githubusercontent.com/1317309/75197710-43adbf00-57a2-11ea-9ae4-4e292de39c36.png)

Closes #27687 from HeartSaVioR/SPARK-30943.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 9ea6c0a8975a1277abba799b51aca4e2818c23e7)
Signed-off-by: Shixiong Zhu 
---
 .../org/apache/spark/ui/static/streaming-page.js   |  2 +-
 .../spark/ui/static/structured-streaming-page.js   |  4 +--
 .../ui/StreamingQueryStatisticsPage.scala  | 36 ++
 .../apache/spark/streaming/ui/StreamingPage.scala  | 13 +++-
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js 
b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
index 5b75bc3..ed3e65c3 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -171,7 +171,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, 
unitY, batchInterval) {
 .attr("cy", function(d) { return y(d.y); })
 .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";})
 .on('mouseover', function(d) {
-var tip = formatYValue(d.y) + " " + unitY + " at " + 
timeFormat[d.x];
+var tip = formatYValue(d.y) + " " + unitY + " at " + 
timeTipStrings[d.x];
 showBootstrapTooltip(d3.select(this).node(), tip);
 // show the point
 d3.select(this)
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
 
b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
index 70250fd..c92226b 100644
--- 
a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
+++ 
b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
@@ -106,12 +106,12 @@ function drawAreaStack(id, labels, values, minX, maxX, 
minY, maxY) {
 .on('mouseover', function(d) {
 var tip = '';
 var idx = 0;
-var _values = timeToValues[d._x]
+var _values = formattedTimeToValues[d._x];
 _values.forEach(function (k) {
 tip += labels[idx] + ': ' + k + '   ';
 idx += 1;
 });
-tip += " at " + d._x
+tip += " a

[spark] branch master updated: [SPARK-30943][SS] Show "batch ID" in tool tip string for Structured Streaming UI graphs

2020-02-25 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ea6c0a  [SPARK-30943][SS] Show "batch ID" in tool tip string for 
Structured Streaming UI graphs
9ea6c0a is described below

commit 9ea6c0a8975a1277abba799b51aca4e2818c23e7
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Tue Feb 25 15:29:36 2020 -0800

[SPARK-30943][SS] Show "batch ID" in tool tip string for Structured 
Streaming UI graphs

### What changes were proposed in this pull request?

This patch changes the tool tip string in Structured Streaming UI graphs to 
show batch ID (and timestamp as well) instead of only showing timestamp, which 
was a key for DStream but no longer a key for Structured Streaming.

This patch does some refactoring as there're some spots on confusion 
between js file for streaming and structured streaming.

Note that this patch doesn't actually change the x axis, as once we change 
it we should decouple the logic for graphs between streaming and structured 
streaming. It won't change UX meaningfully as in x axis we only show min and 
max which we still would like to know about "time" as well as batch ID.

### Why are the changes needed?

In Structured Streaming, everything is aligned for "batch ID" where the UI 
is only showing timestamp - end users have to manually find and correlate batch 
ID and the timestamp which is clearly a huge pain.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Manually tested. Screenshots:

![Screen Shot 2020-02-25 at 7 22 38 
AM](https://user-images.githubusercontent.com/1317309/75197701-40b2ce80-57a2-11ea-9578-c2eb2d1091de.png)
![Screen Shot 2020-02-25 at 7 22 44 
AM](https://user-images.githubusercontent.com/1317309/75197704-427c9200-57a2-11ea-9439-e0a8303d0860.png)
![Screen Shot 2020-02-25 at 7 22 58 
AM](https://user-images.githubusercontent.com/1317309/75197706-43152880-57a2-11ea-9617-1276c3ba181e.png)
![Screen Shot 2020-02-25 at 7 23 04 
AM](https://user-images.githubusercontent.com/1317309/75197708-43152880-57a2-11ea-9de2-7d37eaf88102.png)
![Screen Shot 2020-02-25 at 7 23 31 
AM](https://user-images.githubusercontent.com/1317309/75197710-43adbf00-57a2-11ea-9ae4-4e292de39c36.png)

Closes #27687 from HeartSaVioR/SPARK-30943.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../org/apache/spark/ui/static/streaming-page.js   |  2 +-
 .../spark/ui/static/structured-streaming-page.js   |  4 +--
 .../ui/StreamingQueryStatisticsPage.scala  | 36 ++
 .../apache/spark/streaming/ui/StreamingPage.scala  | 13 +++-
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js 
b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
index 5b75bc3..ed3e65c3 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
@@ -171,7 +171,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, 
unitY, batchInterval) {
 .attr("cy", function(d) { return y(d.y); })
 .attr("r", function(d) { return isFailedBatch(d.x) ? "2" : "3";})
 .on('mouseover', function(d) {
-var tip = formatYValue(d.y) + " " + unitY + " at " + 
timeFormat[d.x];
+var tip = formatYValue(d.y) + " " + unitY + " at " + 
timeTipStrings[d.x];
 showBootstrapTooltip(d3.select(this).node(), tip);
 // show the point
 d3.select(this)
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
 
b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
index 70250fd..c92226b 100644
--- 
a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
+++ 
b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
@@ -106,12 +106,12 @@ function drawAreaStack(id, labels, values, minX, maxX, 
minY, maxY) {
 .on('mouseover', function(d) {
 var tip = '';
 var idx = 0;
-var _values = timeToValues[d._x]
+var _values = formattedTimeToValues[d._x];
 _values.forEach(function (k) {
 tip += labels[idx] + ': ' + k + '   ';
 idx += 1;
 });
-tip += " at " + d._x
+tip += " at " + formattedTimeTipStrings[d._x];
 showBootstrapTooltip(d3.select(th

[spark] branch master updated: [SPARK-29543][SS][UI] Structured Streaming Web UI

2020-01-29 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7173786  [SPARK-29543][SS][UI] Structured Streaming Web UI
7173786 is described below

commit 71737861531180bbda9aec8d241b1428fe91cab2
Author: uncleGen 
AuthorDate: Wed Jan 29 13:43:51 2020 -0800

[SPARK-29543][SS][UI] Structured Streaming Web UI

### What changes were proposed in this pull request?

This PR adds two pages to Web UI for Structured Streaming:
   - "/streamingquery": Streaming Query Page, providing some aggregate 
information for running/completed streaming queries.
  - "/streamingquery/statistics": Streaming Query Statistics Page, 
providing detailed information for streaming query, including `Input Rate`, 
`Process Rate`, `Input Rows`, `Batch Duration` and `Operation Duration`

![Screen Shot 2020-01-29 at 1 38 00 
PM](https://user-images.githubusercontent.com/1000778/73399837-cd01cc80-429c-11ea-9d4b-1d200a41b8d5.png)
![Screen Shot 2020-01-29 at 1 39 16 
PM](https://user-images.githubusercontent.com/1000778/73399838-cd01cc80-429c-11ea-8185-4e56db6866bd.png)

### Why are the changes needed?

It helps users to better monitor Structured Streaming query.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

- new added and existing UTs
- manual test

Closes #26201 from uncleGen/SPARK-29543.

Lead-authored-by: uncleGen 
Co-authored-by: Yuanjian Li 
Co-authored-by: Genmao Yu 
Signed-off-by: Shixiong Zhu 
---
 .../org/apache/spark}/ui/static/streaming-page.css |   0
 .../org/apache/spark}/ui/static/streaming-page.js  |   0
 .../spark/ui/static/structured-streaming-page.js   | 171 +
 .../resources/org/apache/spark/ui/static/webui.js  |   2 +
 .../scala/org/apache/spark/ui/GraphUIData.scala| 169 +
 .../main/scala/org/apache/spark/ui/UIUtils.scala   |  91 +++
 .../scala/org/apache/spark/ui/jobs/StagePage.scala |  14 +-
 .../org/apache/spark/ui/jobs/StageTable.scala  |  14 +-
 project/MimaExcludes.scala |   5 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  16 ++
 .../sql/execution/streaming/ProgressReporter.scala |   5 +-
 .../sql/execution/streaming/StreamExecution.scala  |   3 +-
 .../apache/spark/sql/internal/SharedState.scala|  19 +-
 .../sql/streaming/StreamingQueryListener.scala |   4 +-
 .../sql/streaming/StreamingQueryManager.scala  |   6 +-
 .../org/apache/spark/sql/streaming/progress.scala  |   2 +
 .../sql/streaming/ui/StreamingQueryPage.scala  | 147 +++
 .../ui/StreamingQueryStatisticsPage.scala  | 271 +
 .../ui/StreamingQueryStatusListener.scala  | 122 ++
 .../spark/sql/streaming/ui/StreamingQueryTab.scala |  33 +--
 .../apache/spark/sql/streaming/ui/UIUtils.scala|  60 +
 .../streaming/StreamingQueryListenerSuite.scala|  10 +-
 .../StreamingQueryStatusAndProgressSuite.scala |   2 +
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  14 +-
 .../sql/streaming/ui/StreamingQueryPageSuite.scala | 125 ++
 .../ui/StreamingQueryStatusListenerSuite.scala | 101 
 .../spark/sql/streaming/ui/UIUtilsSuite.scala  |  41 
 .../hive/thriftserver/ui/ThriftServerPage.scala|  16 +-
 .../apache/spark/streaming/dstream/DStream.scala   |   4 +-
 .../spark/streaming/scheduler/JobScheduler.scala   |   4 +-
 .../spark/streaming/ui/AllBatchesTable.scala   |   2 +-
 .../org/apache/spark/streaming/ui/BatchPage.scala  |   2 +-
 .../apache/spark/streaming/ui/StreamingPage.scala  | 125 +-
 .../apache/spark/streaming/ui/StreamingTab.scala   |   2 +-
 .../org/apache/spark/streaming/ui/UIUtils.scala|  71 +-
 .../apache/spark/streaming/DStreamScopeSuite.scala |   6 +-
 .../apache/spark/streaming/ui/UIUtilsSuite.scala   |  12 +-
 37 files changed, 1408 insertions(+), 283 deletions(-)

diff --git 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css
 b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
similarity index 100%
rename from 
streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
diff --git 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
 b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
similarity index 100%
rename from 
streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.

[spark] branch master updated: [SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source and streaming source v1

2020-01-30 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f56ba37  [SPARK-30656][SS] Support the "minPartitions" option in Kafka 
batch source and streaming source v1
f56ba37 is described below

commit f56ba37d8bf618f2bef23d808e0fc5704261b139
Author: Shixiong Zhu 
AuthorDate: Thu Jan 30 18:14:50 2020 -0800

[SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source 
and streaming source v1

### What changes were proposed in this pull request?

- Add `minPartitions` support for Kafka Streaming V1 source.
- Add `minPartitions` support for Kafka batch V1  and V2 source.
- There is lots of refactoring (moving codes to KafkaOffsetReader) to reuse 
codes.

### Why are the changes needed?

Right now, the "minPartitions" option only works in Kafka streaming source 
v2. It would be great that we can support it in batch and streaming source v1 
(v1 is the fallback mode when a user hits a regression in v2) as well.

### Does this PR introduce any user-facing change?

Yep. The `minPartitions` options is supported in Kafka batch and streaming 
queries for both data source V1 and V2.

### How was this patch tested?

New unit tests are added to test "minPartitions".

Closes #27388 from zsxwing/kafka-min-partitions.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 docs/structured-streaming-kafka-integration.md |   2 +-
 .../org/apache/spark/sql/kafka010/KafkaBatch.scala |  32 +
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  75 +-
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  20 +--
 .../spark/sql/kafka010/KafkaOffsetReader.scala | 156 +
 .../apache/spark/sql/kafka010/KafkaRelation.scala  |  32 +
 .../apache/spark/sql/kafka010/KafkaSource.scala|  64 +
 .../apache/spark/sql/kafka010/KafkaSourceRDD.scala |  21 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  29 
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 107 +-
 .../sql/kafka010/KafkaOffsetReaderSuite.scala  | 139 ++
 .../spark/sql/kafka010/KafkaRelationSuite.scala|  22 +++
 12 files changed, 448 insertions(+), 251 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 0820b38..a15 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -469,7 +469,7 @@ The following configurations are optional:
   minPartitions
   int
   none
-  streaming
+  streaming and batch
   Desired minimum number of partitions to read from Kafka.
   By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions 
consuming from Kafka.
   If you set this option to a value greater than your topicPartitions, Spark 
will divvy up large
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
index 3006770..9ad083f 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
@@ -57,36 +57,12 @@ private[kafka010] class KafkaBatch(
   driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
 // Leverage the KafkaReader to obtain the relevant partition offsets
-val (fromPartitionOffsets, untilPartitionOffsets) = {
-  try {
-(kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, 
isStartingOffsets = true),
-  kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, 
isStartingOffsets = false))
-  } finally {
-kafkaOffsetReader.close()
-  }
+val offsetRanges: Seq[KafkaOffsetRange] = try {
+  kafkaOffsetReader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, 
endingOffsets)
+} finally {
+  kafkaOffsetReader.close()
 }
 
-// Obtain topicPartitions in both from and until partition offset, ignoring
-// topic partitions that were added and/or deleted between the two above 
calls.
-if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
-  implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => 
t.topic())
-  val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
-  val untilTopics = 
untilPartitionOffsets.keySet.toList.sorted.mkString(",")
-  throw new IllegalStateException("different topic partitions " +
-s"for starting offsets topics[${fromTopics}] and " +
-s"ending offsets topics[${untilTopics}]")

[spark] branch master updated: [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits

2020-01-31 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 481e521  [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming 
limits
481e521 is described below

commit 481e5211d237173ea0fb7c0b292eb7abd2b8a3fe
Author: Tathagata Das 
AuthorDate: Fri Jan 31 09:26:03 2020 -0800

[SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits

This PR solves two bugs related to streaming limits

**Bug 1 (SPARK-30658)**: Limit before a streaming aggregate (i.e. 
`df.limit(5).groupBy().count()`) in complete mode was not being planned as a 
stateful streaming limit. The planner rule planned a logical limit with a 
stateful streaming limit plan only if the query is in append mode. As a result, 
instead of allowing max 5 rows across batches, the planned streaming query was 
allowing 5 rows in every batch thus producing incorrect results.

**Solution**: Change the planner rule to plan the logical limit with a 
streaming limit plan even when the query is in complete mode if the logical 
limit has no stateful operator before it.

**Bug 2 (SPARK-30657)**: `LocalLimitExec` does not consume the iterator of 
the child plan. So if there is a limit after a stateful operator like streaming 
dedup in append mode (e.g. `df.dropDuplicates().limit(5)`), the state changes 
of streaming duplicate may not be committed (most stateful ops commit state 
changes only after the generated iterator is fully consumed).

**Solution**: Change the planner rule to always use a new 
`StreamingLocalLimitExec` which always fully consumes the iterator. This is the 
safest thing to do. However, this will introduce a performance regression as 
consuming the iterator is extra work. To minimize this performance impact, add 
an additional post-planner optimization rule to replace 
`StreamingLocalLimitExec` with `LocalLimitExec` when there is no stateful 
operator before the limit that could be affected by it.

No

Updated incorrect unit tests and added new ones

Closes #27373 from tdas/SPARK-30657.

Authored-by: Tathagata Das 
Signed-off-by: Shixiong Zhu 
---
 .../spark/sql/execution/SparkStrategies.scala  |  38 ---
 .../execution/streaming/IncrementalExecution.scala |  34 ++-
 ...GlobalLimitExec.scala => streamingLimits.scala} |  55 --
 .../apache/spark/sql/streaming/StreamSuite.scala   | 112 -
 4 files changed, 211 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 00ad4e0..bd2684d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -451,21 +451,35 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
* Used to plan the streaming global limit operator for streams in append 
mode.
* We need to check for either a direct Limit or a Limit wrapped in a 
ReturnAnswer operator,
* following the example of the SpecialLimits Strategy above.
-   * Streams with limit in Append mode use the stateful 
StreamingGlobalLimitExec.
-   * Streams with limit in Complete mode use the stateless CollectLimitExec 
operator.
-   * Limit is unsupported for streams in Update mode.
*/
   case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends 
Strategy {
-override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-  case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), child)
-if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-  StreamingGlobalLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
-case _ => Nil
+
+private def generatesStreamingAppends(plan: LogicalPlan): Boolean = {
+
+  /** Ensures that this plan does not have a streaming aggregate in it. */
+  def hasNoStreamingAgg: Boolean = {
+plan.collectFirst { case a: Aggregate if a.isStreaming => a }.isEmpty
   }
-  case Limit(IntegerLiteral(limit), child)
-  if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-StreamingGlobalLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
+
+  // The following cases of limits on a streaming plan has to be executed 
with a stateful
+  // streaming plan.
+  // 1. When the query is in append mode (that is, all logical plan 
operate on appended data).
+  // 2. When the plan does not contain any streaming aggregate (that is, 
plan has only
+  //operators that operate on appended data). This must be executed 
with

[spark] branch branch-3.0 updated: [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf

2020-02-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new f9b8637  [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` 
configs to StaticSQLConf
f9b8637 is described below

commit f9b86370cb04b72a4f00cbd4d60873960aa2792c
Author: Yuanjian Li 
AuthorDate: Sun Feb 2 23:37:13 2020 -0800

[SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to 
StaticSQLConf

### What changes were proposed in this pull request?
Put the configs below needed by Structured Streaming UI into StaticSQLConf:

- spark.sql.streaming.ui.enabled
- spark.sql.streaming.ui.retainedProgressUpdates
- spark.sql.streaming.ui.retainedQueries

### Why are the changes needed?
Make all SS UI configs consistent with other similar configs in usage and 
naming.

### Does this PR introduce any user-facing change?
Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`.

### How was this patch tested?
Existing UT.

Closes #27425 from xuanyuanking/SPARK-29543-follow.

Authored-by: Yuanjian Li 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit a4912cee615314e9578e6ab4eae25f147feacbd5)
Signed-off-by: Shixiong Zhu 
---
 .../org/apache/spark/sql/internal/SQLConf.scala  | 16 
 .../apache/spark/sql/internal/StaticSQLConf.scala| 20 
 .../org/apache/spark/sql/internal/SharedState.scala  | 15 ---
 .../streaming/ui/StreamingQueryStatusListener.scala  | 10 ++
 .../spark/sql/streaming/ui/StreamingQueryTab.scala   |  2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala   |  4 ++--
 6 files changed, 37 insertions(+), 30 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04572c3..3ad3416 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,18 +1150,6 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
-  val STREAMING_UI_ENABLED =
-buildConf("spark.sql.streaming.ui.enabled")
-  .doc("Whether to run the structured streaming UI for the Spark 
application.")
-  .booleanConf
-  .createWithDefault(true)
-
-  val STREAMING_UI_INACTIVE_QUERY_RETENTION =
-buildConf("spark.sql.streaming.ui.numInactiveQueries")
-  .doc("The number of inactive queries to retain for structured streaming 
ui.")
-  .intConf
-  .createWithDefault(100)
-
   val VARIABLE_SUBSTITUTE_ENABLED =
 buildConf("spark.sql.variable.substitute")
   .doc("This enables substitution using syntax like ${var} ${system:var} 
and ${env:var}.")
@@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging {
 
   def isUnsupportedOperationCheckEnabled: Boolean = 
getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
 
-  def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
-
-  def streamingUIInactiveQueryRetention: Int = 
getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
-
   def streamingFileCommitProtocolClass: String = 
getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
 
   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 66ac9ddb..6bc7522 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -176,4 +176,24 @@ object StaticSQLConf {
   .internal()
   .booleanConf
   .createWithDefault(true)
+
+  val STREAMING_UI_ENABLED =
+buildStaticConf("spark.sql.streaming.ui.enabled")
+  .doc("Whether to run the Structured Streaming Web UI for the Spark 
application when the " +
+"Spark Web UI is enabled.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val STREAMING_UI_RETAINED_PROGRESS_UPDATES =
+buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates")
+  .doc("The number of progress updates to retain for a streaming query for 
Structured " +
+"Streaming UI.")
+  .intConf
+  .createWithDefault(100)
+
+  val STREAMING_UI_RETAINED_QUERIES =
+buildStaticConf("spark.sql.streaming.ui.retainedQueries")
+  .doc("The number of inactive queries to retain for Structured Streaming 
UI.")
+  .intConf
+  .createWithDefault(100)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/s

[spark] branch master updated: [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to StaticSQLConf

2020-02-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a4912ce  [SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` 
configs to StaticSQLConf
a4912ce is described below

commit a4912cee615314e9578e6ab4eae25f147feacbd5
Author: Yuanjian Li 
AuthorDate: Sun Feb 2 23:37:13 2020 -0800

[SPARK-29543][SS][FOLLOWUP] Move `spark.sql.streaming.ui.*` configs to 
StaticSQLConf

### What changes were proposed in this pull request?
Put the configs below needed by Structured Streaming UI into StaticSQLConf:

- spark.sql.streaming.ui.enabled
- spark.sql.streaming.ui.retainedProgressUpdates
- spark.sql.streaming.ui.retainedQueries

### Why are the changes needed?
Make all SS UI configs consistent with other similar configs in usage and 
naming.

### Does this PR introduce any user-facing change?
Yes, add new static config `spark.sql.streaming.ui.retainedProgressUpdates`.

### How was this patch tested?
Existing UT.

Closes #27425 from xuanyuanking/SPARK-29543-follow.

Authored-by: Yuanjian Li 
Signed-off-by: Shixiong Zhu 
---
 .../org/apache/spark/sql/internal/SQLConf.scala  | 16 
 .../apache/spark/sql/internal/StaticSQLConf.scala| 20 
 .../org/apache/spark/sql/internal/SharedState.scala  | 15 ---
 .../streaming/ui/StreamingQueryStatusListener.scala  | 10 ++
 .../spark/sql/streaming/ui/StreamingQueryTab.scala   |  2 +-
 .../ui/StreamingQueryStatusListenerSuite.scala   |  4 ++--
 6 files changed, 37 insertions(+), 30 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 04572c3..3ad3416 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1150,18 +1150,6 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
-  val STREAMING_UI_ENABLED =
-buildConf("spark.sql.streaming.ui.enabled")
-  .doc("Whether to run the structured streaming UI for the Spark 
application.")
-  .booleanConf
-  .createWithDefault(true)
-
-  val STREAMING_UI_INACTIVE_QUERY_RETENTION =
-buildConf("spark.sql.streaming.ui.numInactiveQueries")
-  .doc("The number of inactive queries to retain for structured streaming 
ui.")
-  .intConf
-  .createWithDefault(100)
-
   val VARIABLE_SUBSTITUTE_ENABLED =
 buildConf("spark.sql.variable.substitute")
   .doc("This enables substitution using syntax like ${var} ${system:var} 
and ${env:var}.")
@@ -2284,10 +2272,6 @@ class SQLConf extends Serializable with Logging {
 
   def isUnsupportedOperationCheckEnabled: Boolean = 
getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
 
-  def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
-
-  def streamingUIInactiveQueryRetention: Int = 
getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
-
   def streamingFileCommitProtocolClass: String = 
getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
 
   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 66ac9ddb..6bc7522 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -176,4 +176,24 @@ object StaticSQLConf {
   .internal()
   .booleanConf
   .createWithDefault(true)
+
+  val STREAMING_UI_ENABLED =
+buildStaticConf("spark.sql.streaming.ui.enabled")
+  .doc("Whether to run the Structured Streaming Web UI for the Spark 
application when the " +
+"Spark Web UI is enabled.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val STREAMING_UI_RETAINED_PROGRESS_UPDATES =
+buildStaticConf("spark.sql.streaming.ui.retainedProgressUpdates")
+  .doc("The number of progress updates to retain for a streaming query for 
Structured " +
+"Streaming UI.")
+  .intConf
+  .createWithDefault(100)
+
+  val STREAMING_UI_RETAINED_QUERIES =
+buildStaticConf("spark.sql.streaming.ui.retainedQueries")
+  .doc("The number of inactive queries to retain for Structured Streaming 
UI.")
+  .intConf
+  .createWithDefault(100)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
i

[spark] branch branch-3.0 updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver

2020-02-14 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1385fc0  [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy 
environment variable to set in both executor and driver
1385fc0 is described below

commit 1385fc02ce7d28e6570971e1687e74d245a5533f
Author: HyukjinKwon 
AuthorDate: Fri Feb 14 10:18:08 2020 -0800

[SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment 
variable to set in both executor and driver

### What changes were proposed in this pull request?

This PR address the comment at 
https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves 
the migration guide to explicitly note that the legacy environment variable to 
set in both executor and driver.

### Why are the changes needed?

To clarify this env should be set both in driver and executors.

### Does this PR introduce any user-facing change?

Nope.

### How was this patch tested?

I checked it via md editor.

Closes #27573 from HyukjinKwon/SPARK-29748.

Authored-by: HyukjinKwon 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit b343757b1bd5d0344b82f36aa4d65ed34f840606)
Signed-off-by: Shixiong Zhu 
---
 docs/pyspark-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md
index 8ea4fec..f7f2038 100644
--- a/docs/pyspark-migration-guide.md
+++ b/docs/pyspark-migration-guide.md
@@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.
   - Since Spark 3.0, `Column.getItem` is fixed such that it does not call 
`Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, 
the indexing operator should be used.
 For example, `map_col.getItem(col('id'))` should be replaced with 
`map_col[col('id')]`.
 
-  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 
3.6, the field names will be sorted alphabetically as the only option.
+  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - 
this environment variable must be consistent on all executors and driver; 
otherwise, it may cause failures or incorrect answers. For [...]
 
 ## Upgrading from PySpark 2.3 to 2.4
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver

2020-02-14 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b343757  [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy 
environment variable to set in both executor and driver
b343757 is described below

commit b343757b1bd5d0344b82f36aa4d65ed34f840606
Author: HyukjinKwon 
AuthorDate: Fri Feb 14 10:18:08 2020 -0800

[SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment 
variable to set in both executor and driver

### What changes were proposed in this pull request?

This PR address the comment at 
https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves 
the migration guide to explicitly note that the legacy environment variable to 
set in both executor and driver.

### Why are the changes needed?

To clarify this env should be set both in driver and executors.

### Does this PR introduce any user-facing change?

Nope.

### How was this patch tested?

I checked it via md editor.

Closes #27573 from HyukjinKwon/SPARK-29748.

Authored-by: HyukjinKwon 
Signed-off-by: Shixiong Zhu 
---
 docs/pyspark-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md
index 8ea4fec..f7f2038 100644
--- a/docs/pyspark-migration-guide.md
+++ b/docs/pyspark-migration-guide.md
@@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.
   - Since Spark 3.0, `Column.getItem` is fixed such that it does not call 
`Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, 
the indexing operator should be used.
 For example, `map_col.getItem(col('id'))` should be replaced with 
`map_col[col('id')]`.
 
-  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 
3.6, the field names will be sorted alphabetically as the only option.
+  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - 
this environment variable must be consistent on all executors and driver; 
otherwise, it may cause failures or incorrect answers. For [...]
 
 ## Upgrading from PySpark 2.3 to 2.4
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch 2.0 created (now 0f3d744c)

2020-01-14 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a change to branch 2.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at 0f3d744c [MINOR][TESTS] Remove unsupported `header` option in AvroSuite

No new revisions were added by this update.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID

2020-05-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5a258b0  [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the 
metadata log file when finding the latest batch ID
5a258b0 is described below

commit 5a258b0b67ee7c97a90d8b719c7a2171707c9244
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri May 22 16:46:17 2020 -0700

[SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log 
file when finding the latest batch ID

### What changes were proposed in this pull request?

This patch adds the new method `getLatestBatchId()` in 
CompactibleFileStreamLog in complement of getLatest() which doesn't read the 
content of the latest batch metadata log file, and apply to both 
FileStreamSource and FileStreamSink to avoid unnecessary latency on reading log 
file.

### Why are the changes needed?

Once compacted metadata log file becomes huge, writing outputs for the 
compact + 1 batch is also affected due to unnecessarily reading the compacted 
metadata log file. This unnecessary latency can be simply avoided.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

New UT. Also manually tested under query which has huge metadata log on 
file stream sink:

> before applying the patch

![Screen Shot 2020-02-21 at 4 20 19 
PM](https://user-images.githubusercontent.com/1317309/75016223-d3ffb180-54cd-11ea-9063-49405943049d.png)

> after applying the patch

![Screen Shot 2020-02-21 at 4 06 18 
PM](https://user-images.githubusercontent.com/1317309/75016220-d235ee00-54cd-11ea-81a7-7c03a43c4db4.png)

Peaks are compact batches - please compare the next batch after compact 
batches, especially the area of "light brown".

Closes #27664 from HeartSaVioR/SPARK-30915.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/CompactibleFileStreamLog.scala   |  2 +-
 .../sql/execution/streaming/FileStreamSink.scala   |  2 +-
 .../execution/streaming/FileStreamSourceLog.scala  |  2 +-
 .../sql/execution/streaming/HDFSMetadataLog.scala  | 23 --
 .../streaming/FileStreamSinkLogSuite.scala | 83 ++
 5 files changed, 102 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 10bcfe6..e8ae0ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -213,7 +213,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
* Returns all files except the deleted ones.
*/
   def allFiles(): Array[T] = {
-var latestId = getLatest().map(_._1).getOrElse(-1L)
+var latestId = getLatestBatchId().getOrElse(-1L)
 // There is a race condition when `FileStreamSink` is deleting old files 
and `StreamFileIndex`
 // is calling this method. This loop will retry the reading to deal with 
the
 // race condition.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b679f16..3224547 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -142,7 +142,7 @@ class FileStreamSink(
   }
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
+if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
   logInfo(s"Skipping already committed batch $batchId")
 } else {
   val committer = FileCommitProtocol.instantiate(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 7b2ea96..c438877 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -96,7 +96,7 @@ class FileStreamSourceLog(
 val searchKeys = removedBatches.map(_._1)
 val retrievedBatches = if (searchKeys.nonEmpty) {
   logWarning(s"Get batches from removed files, this is unexpected in the 
current code path!!!")
-  val latestBatchId = getLatest().map(

[spark] branch branch-3.0 updated: [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing

2020-06-08 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new b00ac30  [SPARK-31923][CORE] Ignore internal accumulators that use 
unrecognized types rather than crashing
b00ac30 is described below

commit b00ac30dfb621962e5b39c52a3bb09440936a0ff
Author: Shixiong Zhu 
AuthorDate: Mon Jun 8 12:06:17 2020 -0700

[SPARK-31923][CORE] Ignore internal accumulators that use unrecognized 
types rather than crashing

### What changes were proposed in this pull request?

Ignore internal accumulators that use unrecognized types rather than 
crashing so that an event log containing such accumulators can still be 
converted to JSON and logged.

### Why are the changes needed?

A user may use internal accumulators by adding the `internal.metrics.` 
prefix to the accumulator name to hide sensitive information from UI 
(Accumulators except internal ones will be shown in Spark UI).

However, `org.apache.spark.util.JsonProtocol.accumValueToJson` assumes an 
internal accumulator has only 3 possible types: `int`, `long`, and 
`java.util.List[(BlockId, BlockStatus)]`. When an internal accumulator uses an 
unexpected type, it will crash.

An event log that contains such accumulator will be dropped because it 
cannot be converted to JSON, and it will cause weird UI issue when rendering in 
Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because 
of this issue, the user will see the task is still running even if it was 
finished.

It's better to make `accumValueToJson` more robust because it's up to the 
user to pick up the accumulator name.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The new unit tests.

Closes #28744 from zsxwing/fix-internal-accum.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit b333ed0c4a5733a9c36ad79de1d4c13c6cf3c5d4)
Signed-off-by: Shixiong Zhu 
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++---
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 48 ++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f445fd4..d53ca0f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -351,12 +351,22 @@ private[spark] object JsonProtocol {
 case v: Long => JInt(v)
 // We only have 3 kind of internal accumulator types, so if it's not 
int or long, it must be
 // the blocks accumulator, whose type is `java.util.List[(BlockId, 
BlockStatus)]`
-case v =>
-  JArray(v.asInstanceOf[java.util.List[(BlockId, 
BlockStatus)]].asScala.toList.map {
-case (id, status) =>
-  ("Block ID" -> id.toString) ~
-  ("Status" -> blockStatusToJson(status))
+case v: java.util.List[_] =>
+  JArray(v.asScala.toList.flatMap {
+case (id: BlockId, status: BlockStatus) =>
+  Some(
+("Block ID" -> id.toString) ~
+("Status" -> blockStatusToJson(status))
+  )
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in 
the name. We should
+  // not crash.
+  None
   })
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in the 
name. We should not
+  // crash.
+  JNothing
   }
 } else {
   // For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d1f09d8..5f1c753 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -482,6 +482,54 @@ class JsonProtocolSuite extends SparkFunSuite {
 testAccumValue(Some("anything"), 123, JString("123"))
   }
 
+  /** Create an AccumulableInfo and verify we can serialize and deserialize 
it. */
+  private def testAccumulableInfo(
+  name: String,
+  value: Option[Any],
+  expectedValue: Option[Any]): Unit = {
+val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+val accum = AccumulableInfo(
+  123L,
+  Some(name),
+  update = value,
+  value = value,
+  internal = isInternal,
+  countFailedValues = false)
+val json = JsonProtocol.accumulabl

[spark] branch master updated: [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing

2020-06-08 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b333ed0  [SPARK-31923][CORE] Ignore internal accumulators that use 
unrecognized types rather than crashing
b333ed0 is described below

commit b333ed0c4a5733a9c36ad79de1d4c13c6cf3c5d4
Author: Shixiong Zhu 
AuthorDate: Mon Jun 8 12:06:17 2020 -0700

[SPARK-31923][CORE] Ignore internal accumulators that use unrecognized 
types rather than crashing

### What changes were proposed in this pull request?

Ignore internal accumulators that use unrecognized types rather than 
crashing so that an event log containing such accumulators can still be 
converted to JSON and logged.

### Why are the changes needed?

A user may use internal accumulators by adding the `internal.metrics.` 
prefix to the accumulator name to hide sensitive information from UI 
(Accumulators except internal ones will be shown in Spark UI).

However, `org.apache.spark.util.JsonProtocol.accumValueToJson` assumes an 
internal accumulator has only 3 possible types: `int`, `long`, and 
`java.util.List[(BlockId, BlockStatus)]`. When an internal accumulator uses an 
unexpected type, it will crash.

An event log that contains such accumulator will be dropped because it 
cannot be converted to JSON, and it will cause weird UI issue when rendering in 
Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because 
of this issue, the user will see the task is still running even if it was 
finished.

It's better to make `accumValueToJson` more robust because it's up to the 
user to pick up the accumulator name.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The new unit tests.

Closes #28744 from zsxwing/fix-internal-accum.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++---
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 48 ++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 844d9b7..1c788a3 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -363,12 +363,22 @@ private[spark] object JsonProtocol {
 case v: Long => JInt(v)
 // We only have 3 kind of internal accumulator types, so if it's not 
int or long, it must be
 // the blocks accumulator, whose type is `java.util.List[(BlockId, 
BlockStatus)]`
-case v =>
-  JArray(v.asInstanceOf[java.util.List[(BlockId, 
BlockStatus)]].asScala.toList.map {
-case (id, status) =>
-  ("Block ID" -> id.toString) ~
-  ("Status" -> blockStatusToJson(status))
+case v: java.util.List[_] =>
+  JArray(v.asScala.toList.flatMap {
+case (id: BlockId, status: BlockStatus) =>
+  Some(
+("Block ID" -> id.toString) ~
+("Status" -> blockStatusToJson(status))
+  )
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in 
the name. We should
+  // not crash.
+  None
   })
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in the 
name. We should not
+  // crash.
+  JNothing
   }
 } else {
   // For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 248142a..5a4073b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -507,6 +507,54 @@ class JsonProtocolSuite extends SparkFunSuite {
 testAccumValue(Some("anything"), 123, JString("123"))
   }
 
+  /** Create an AccumulableInfo and verify we can serialize and deserialize 
it. */
+  private def testAccumulableInfo(
+  name: String,
+  value: Option[Any],
+  expectedValue: Option[Any]): Unit = {
+val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+val accum = AccumulableInfo(
+  123L,
+  Some(name),
+  update = value,
+  value = value,
+  internal = isInternal,
+  countFailedValues = false)
+val json = JsonProtocol.accumulableInfoToJson(accum)
+val newAccum = JsonProtocol.accumulableInfoFromJson(json)
+assert(newAccum == accum.

[spark] branch branch-2.4 updated: [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing (branch-2.4)

2020-06-08 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 48017cc  [SPARK-31923][CORE] Ignore internal accumulators that use 
unrecognized types rather than crashing (branch-2.4)
48017cc is described below

commit 48017cc36bdf7d84506daeed589e4cbebff269f8
Author: Shixiong Zhu 
AuthorDate: Mon Jun 8 16:52:34 2020 -0700

[SPARK-31923][CORE] Ignore internal accumulators that use unrecognized 
types rather than crashing (branch-2.4)

### What changes were proposed in this pull request?

Backport #28744 to branch-2.4.

### Why are the changes needed?

Low risky fix for branch-2.4.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes #28758 from zsxwing/SPARK-31923-2.4.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++---
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 47 ++
 2 files changed, 62 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 50c6461..0e613ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -326,12 +326,22 @@ private[spark] object JsonProtocol {
 case v: Long => JInt(v)
 // We only have 3 kind of internal accumulator types, so if it's not 
int or long, it must be
 // the blocks accumulator, whose type is `java.util.List[(BlockId, 
BlockStatus)]`
-case v =>
-  JArray(v.asInstanceOf[java.util.List[(BlockId, 
BlockStatus)]].asScala.toList.map {
-case (id, status) =>
-  ("Block ID" -> id.toString) ~
-  ("Status" -> blockStatusToJson(status))
+case v: java.util.List[_] =>
+  JArray(v.asScala.toList.flatMap {
+case (id: BlockId, status: BlockStatus) =>
+  Some(
+("Block ID" -> id.toString) ~
+("Status" -> blockStatusToJson(status))
+  )
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in 
the name. We should
+  // not crash.
+  None
   })
+case _ =>
+  // Ignore unsupported types. A user may put `METRICS_PREFIX` in the 
name. We should not
+  // crash.
+  JNothing
   }
 } else {
   // For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 74b72d9..40fb2e3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -436,6 +436,53 @@ class JsonProtocolSuite extends SparkFunSuite {
 testAccumValue(Some("anything"), 123, JString("123"))
   }
 
+  /** Create an AccumulableInfo and verify we can serialize and deserialize 
it. */
+  private def testAccumulableInfo(
+  name: String,
+  value: Option[Any],
+  expectedValue: Option[Any]): Unit = {
+val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+val accum = AccumulableInfo(
+  123L,
+  Some(name),
+  update = value,
+  value = value,
+  internal = isInternal,
+  countFailedValues = false)
+val json = JsonProtocol.accumulableInfoToJson(accum)
+val newAccum = JsonProtocol.accumulableInfoFromJson(json)
+assert(newAccum == accum.copy(update = expectedValue, value = 
expectedValue))
+  }
+
+  test("SPARK-31923: unexpected value type of internal accumulator") {
+// Because a user may use `METRICS_PREFIX` in an accumulator name, we 
should test unexpected
+// types to make sure we don't crash.
+import InternalAccumulator.METRICS_PREFIX
+testAccumulableInfo(
+  METRICS_PREFIX + "fooString",
+  value = Some("foo"),
+  expectedValue = None)
+testAccumulableInfo(
+  METRICS_PREFIX + "fooList",
+  value = Some(java.util.Arrays.asList("string")),
+  expectedValue = Some(java.util.Collections.emptyList())
+)
+val blocks = Seq(
+  (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+  (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+testAccumulableInfo(
+  METRICS_PREFIX + "fooList",
+  value = Some(java.util.Arrays.asList(
+&

[spark] branch master updated: [SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`

2020-12-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6fa797e  [SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`
6fa797e is described below

commit 6fa797e977412d071dd4dc079053ec64a21b3041
Author: Yuanjian Li 
AuthorDate: Wed Dec 2 17:31:10 2020 -0800

[SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`

### What changes were proposed in this pull request?
As the discussion in 
https://github.com/apache/spark/pull/30521#discussion_r531463427, rename the 
API to `toTable`.

### Why are the changes needed?
Rename the API for further extension and accuracy.

### Does this PR introduce _any_ user-facing change?
Yes, it's an API change but the new API is not released yet.

### How was this patch tested?
Existing UT.

Closes #30571 from xuanyuanking/SPARK-32896-follow.

Authored-by: Yuanjian Li 
Signed-off-by: Shixiong Zhu 
---
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala| 2 +-
 .../org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index d67e175..9e35997 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
+  def toTable(tableName: String): StreamingQuery = {
 this.source = SOURCE_NAME_TABLE
 this.tableName = tableName
 startInternal(None)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 062b106..bf85043 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -291,7 +291,7 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
 val query = inputDF
   .writeStream
   .option("checkpointLocation", checkpointDir.getAbsolutePath)
-  .saveAsTable(tableIdentifier)
+  .toTable(tableIdentifier)
 
 inputData.addData(newInputs: _*)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`

2020-12-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 878cc0e  [SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`
878cc0e is described below

commit 878cc0e6e95f300a0a58c742654f53a28b30b174
Author: Yuanjian Li 
AuthorDate: Wed Dec 2 17:36:25 2020 -0800

[SPARK-32896][SS][FOLLOW-UP] Rename the API to `toTable`

### What changes were proposed in this pull request?
As the discussion in 
https://github.com/apache/spark/pull/30521#discussion_r531463427, rename the 
API to `toTable`.

### Why are the changes needed?
Rename the API for further extension and accuracy.

### Does this PR introduce _any_ user-facing change?
Yes, it's an API change but the new API is not released yet.

### How was this patch tested?
Existing UT.

Closes #30571 from xuanyuanking/SPARK-32896-follow.

Authored-by: Yuanjian Li 
Signed-off-by: Shixiong Zhu 
---
 .../main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala| 2 +-
 .../org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index d67e175..9e35997 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
+  def toTable(tableName: String): StreamingQuery = {
 this.source = SOURCE_NAME_TABLE
 this.tableName = tableName
 startInternal(None)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 062b106..bf85043 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -291,7 +291,7 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
 val query = inputDF
   .writeStream
   .option("checkpointLocation", checkpointDir.getAbsolutePath)
-  .saveAsTable(tableIdentifier)
+  .toTable(tableIdentifier)
 
 inputData.addData(newInputs: _*)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-31953][SS] Add Spark Structured Streaming History Server Support

2020-12-02 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4f96670  [SPARK-31953][SS] Add Spark Structured Streaming History 
Server Support
4f96670 is described below

commit 4f9667035886a67e6c9a4e8fad2efa390e87ca68
Author: uncleGen 
AuthorDate: Wed Dec 2 17:11:51 2020 -0800

[SPARK-31953][SS] Add Spark Structured Streaming History Server Support

### What changes were proposed in this pull request?

Add Spark Structured Streaming History Server Support.

### Why are the changes needed?

Add a streaming query history server plugin.


![image](https://user-images.githubusercontent.com/7402327/84248291-d26cfe80-ab3b-11ea-86d2-98205fa2bcc4.png)

![image](https://user-images.githubusercontent.com/7402327/84248347-e44ea180-ab3b-11ea-81de-eefe207656f2.png)

![image](https://user-images.githubusercontent.com/7402327/84248396-f0d2fa00-ab3b-11ea-9b0d-e410115471b0.png)

- Follow-ups
  - Query duration should not update in history UI.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Update UT.

Closes #28781 from uncleGen/SPARK-31953.

Lead-authored-by: uncleGen 
Co-authored-by: Genmao Yu 
Co-authored-by: Yuanjian Li 
Signed-off-by: Shixiong Zhu 
---
 dev/.rat-excludes  |   1 +
 .../org.apache.spark.status.AppHistoryServerPlugin |   1 +
 .../streaming/StreamingQueryListenerBus.scala  |  26 +++-
 .../ui/StreamingQueryHistoryServerPlugin.scala |  43 ++
 .../execution/ui/StreamingQueryStatusStore.scala   |  53 +++
 .../apache/spark/sql/internal/SharedState.scala|   8 +-
 .../sql/streaming/StreamingQueryManager.scala  |   3 +-
 .../sql/streaming/ui/StreamingQueryPage.scala  |  44 +++---
 .../ui/StreamingQueryStatisticsPage.scala  |  27 ++--
 .../ui/StreamingQueryStatusListener.scala  | 166 +
 .../spark/sql/streaming/ui/StreamingQueryTab.scala |   3 +-
 .../apache/spark/sql/streaming/ui/UIUtils.scala|  12 +-
 .../resources/spark-events/local-1596020211915 | 160 
 .../org/apache/spark/deploy/history/Utils.scala}   |  39 ++---
 .../streaming/ui/StreamingQueryHistorySuite.scala  |  63 
 .../sql/streaming/ui/StreamingQueryPageSuite.scala |  42 +++---
 .../ui/StreamingQueryStatusListenerSuite.scala | 159 
 17 files changed, 673 insertions(+), 177 deletions(-)

diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 7da330d..167cf22 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -123,6 +123,7 @@ SessionHandler.java
 GangliaReporter.java
 application_1578436911597_0052
 config.properties
+local-1596020211915
 app-20200706201101-0003
 py.typed
 _metadata
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
index 0bba2f8..6771eef 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -1 +1,2 @@
 org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin
+org.apache.spark.sql.execution.ui.StreamingQueryHistoryServerPlugin
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 1b8d69f..4b98acd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -31,16 +31,21 @@ import org.apache.spark.util.ListenerBus
  * Spark listener bus, so that it can receive 
[[StreamingQueryListener.Event]]s and dispatch them
  * to StreamingQueryListeners.
  *
- * Note that each bus and its registered listeners are associated with a 
single SparkSession
+ * Note 1: Each bus and its registered listeners are associated with a single 
SparkSession
  * and StreamingQueryManager. So this bus will dispatch events to registered 
listeners for only
  * those queries that were started in the associated SparkSession.
+ *
+ * Note 2: To rebuild Structured Streaming UI in SHS, this bus will be 
registered into
+ * [[org.apache.spark.scheduler.ReplayListenerBus]]. We check 
`sparkListenerBus` defined or not to
+ * determine how to process [[StreamingQueryListener.Event]]. If false, it 
means this bus is used to
+ * replay all streaming query event from eventLog.
  */
-class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
+class

[spark] branch master updated: [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table

2022-11-08 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7074e4fee7e [SPARK-41040][SS] Fix self-union streaming query failure 
when using readStream.table
7074e4fee7e is described below

commit 7074e4fee7e6944013cfaa3c0c2a1458cce8a72d
Author: Shixiong Zhu 
AuthorDate: Tue Nov 8 08:31:24 2022 -0800

[SPARK-41040][SS] Fix self-union streaming query failure when using 
readStream.table

### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/36963 added a check to disallow any 
source setting `CatalogTable` in the batch plan. However, this check is not 
safe to enforce:

- In a self-union query, the batch plan created by the source will be 
shared by multiple nodes in the plan. When we transform the plan, the batch 
plan will be visited multiple times. Hence, the first visit will set the 
`CatalogTable` and the second visit will try to set it again and fail the query.
- A source built by arbitrary developers can set `CatalogTable` in the 
batch plan. We should not fail as it would break an existing source.

This PR fixes the issue by removing the check and set `CatalogTable` only 
if the batch plan doesn't have one.

### Why are the changes needed?

Fix a bug in master.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The new added unit test

Closes #38553 from zsxwing/SPARK-41040.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../sql/execution/streaming/MicroBatchExecution.scala  | 18 +++---
 .../sql/streaming/test/DataStreamTableAPISuite.scala   | 13 +
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7ed19b35114..051e45c71e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -621,10 +621,22 @@ class MicroBatchExecution(
   if (hasFileMetadata) {
 newRelation = newRelation.withMetadataColumns()
   }
-  catalogTable.foreach { table =>
-assert(newRelation.catalogTable.isEmpty,
+  // If the catalog table is not set in the batch plan generated 
by the source, we will
+  // pick up the one from `StreamingExecutionRelation`. Otherwise, 
we will skip this
+  // step. The skipping can happen in the following cases:
+  // - We re-visit the same `StreamingExecutionRelation`. For 
example, self-union will
+  //   share the same `StreamingExecutionRelation` and `transform` 
will visit it twice.
+  //   This is safe to skip.
+  // - A source that sets the catalog table explicitly. We will 
pick up the one provided
+  //   by the source directly to maintain the same behavior.
+  if (newRelation.catalogTable.isEmpty) {
+catalogTable.foreach { table =>
+  newRelation = newRelation.copy(catalogTable = Some(table))
+}
+  } else if (catalogTable.exists(_ ne 
newRelation.catalogTable.get)) {
+// Output a warning if `catalogTable` is provided by the 
source rather than engine
+logWarning(
   s"Source $source should not produce the information of 
catalog table by its own.")
-newRelation = newRelation.copy(catalogTable = Some(table))
   }
   newRelation
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 0d1242fbb19..6bbf2239dbf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -484,6 +484,19 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
 }
   }
 
+  test("SPARK-41040: self-union using readStream.table should not fail") {
+withTable("self_union_table") {
+  spark.range(10).write.format("parquet").saveAsTable("self_union_table")
+  val df = spark.readStream.format("parquet").table("self_union_table")
+  val q = df.union(df).writeStream.format("noop").start()
+  try {
+q.processAll

[spark] branch master updated: [SPARK-41045][SQL] Pre-compute to eliminate ScalaReflection calls after deserializer is created

2022-11-08 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ef402edff91 [SPARK-41045][SQL] Pre-compute to eliminate 
ScalaReflection calls after deserializer is created
ef402edff91 is described below

commit ef402edff91377d37c0c1b8d40921ed7bd9f7160
Author: Shixiong Zhu 
AuthorDate: Tue Nov 8 08:18:50 2022 -0800

[SPARK-41045][SQL] Pre-compute to eliminate ScalaReflection calls after 
deserializer is created

### What changes were proposed in this pull request?

Currently when `ScalaReflection` returns a deserializer, for a few complex 
types, such as array, map, udt, etc, it creates functions that may still touch 
`ScalaReflection` after the deserializer is created.

`ScalaReflection` is a performance bottleneck for multiple threads as it 
holds multiple global locks. We can refactor `ScalaReflection.deserializerFor` 
to pre-compute everything that needs to touch `ScalaReflection` before creating 
the deserializer. After this, once the deserializer is created, it can be 
reused by multiple threads without touching `ScalaReflection.deserializerFor` 
any more and it will be much faster.

### Why are the changes needed?

Optimize `ScalaReflection.deserializerFor` to make deserializers faster 
under multiple threads.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This is refactoring `deserializerFor` to optimize the code. Existing tests 
should already cover the correctness.

Closes #38556 from zsxwing/scala-ref.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../sql/catalyst/DeserializerBuildHelper.scala |   5 +-
 .../spark/sql/catalyst/JavaTypeInference.scala |   8 +-
 .../spark/sql/catalyst/ScalaReflection.scala   | 157 +++--
 3 files changed, 85 insertions(+), 85 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
index 0d3b9977e4f..7051c2d2264 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
@@ -49,10 +49,9 @@ object DeserializerBuildHelper {
   dataType: DataType,
   nullable: Boolean,
   walkedTypePath: WalkedTypePath,
-  funcForCreatingDeserializer: (Expression, WalkedTypePath) => 
Expression): Expression = {
+  funcForCreatingDeserializer: Expression => Expression): Expression = {
 val casted = upCastToExpectedType(expr, dataType, walkedTypePath)
-expressionWithNullSafety(funcForCreatingDeserializer(casted, 
walkedTypePath),
-  nullable, walkedTypePath)
+expressionWithNullSafety(funcForCreatingDeserializer(casted), nullable, 
walkedTypePath)
   }
 
   def expressionWithNullSafety(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index dccaf1c4835..827807055ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -218,9 +218,7 @@ object JavaTypeInference {
 
 // Assumes we are deserializing the first column of a row.
 deserializerForWithNullSafetyAndUpcast(GetColumnByOrdinal(0, dataType), 
dataType,
-  nullable = nullable, walkedTypePath, (casted, walkedTypePath) => {
-deserializerFor(typeToken, casted, walkedTypePath)
-  })
+  nullable = nullable, walkedTypePath, deserializerFor(typeToken, _, 
walkedTypePath))
   }
 
   private def deserializerFor(
@@ -280,7 +278,7 @@ object JavaTypeInference {
 dataType,
 nullable = elementNullable,
 newTypePath,
-(casted, typePath) => deserializerFor(typeToken.getComponentType, 
casted, typePath))
+deserializerFor(typeToken.getComponentType, _, newTypePath))
 }
 
 val arrayData = UnresolvedMapObjects(mapFunction, path)
@@ -309,7 +307,7 @@ object JavaTypeInference {
 dataType,
 nullable = elementNullable,
 newTypePath,
-(casted, typePath) => deserializerFor(et, casted, typePath))
+deserializerFor(et, _, newTypePath))
 }
 
 UnresolvedMapObjects(mapFunction, path, customCollectionCls = Some(c))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 12093b9f4b2..d895a0fbe19 100644
--- 
a/s

<    3   4   5   6   7   8