[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...

2018-07-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21807
  
@mauropalsgraaf Could you fix the PR title?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21699
  
**[Test build #93824 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93824/testReport)**
 for PR 21699 at commit 
[`34535a9`](https://github.com/apache/spark/commit/34535a9cc5ec7a2ba880f7f525feb7dbbc0b0c37).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...

2018-07-30 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20611#discussion_r206411960
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -303,94 +303,44 @@ case class LoadDataCommand(
   s"partitioned, but a partition spec was provided.")
   }
 }
-
-val loadPath =
+val loadPath = {
   if (isLocal) {
-val uri = Utils.resolveURI(path)
-val file = new File(uri.getPath)
-val exists = if (file.getAbsolutePath.contains("*")) {
-  val fileSystem = FileSystems.getDefault
-  val dir = file.getParentFile.getAbsolutePath
-  if (dir.contains("*")) {
-throw new AnalysisException(
-  s"LOAD DATA input path allows only filename wildcard: $path")
-  }
-
-  // Note that special characters such as "*" on Windows are not 
allowed as a path.
-  // Calling `WindowsFileSystem.getPath` throws an exception if 
there are in the path.
-  val dirPath = fileSystem.getPath(dir)
-  val pathPattern = new File(dirPath.toAbsolutePath.toString, 
file.getName).toURI.getPath
-  val safePathPattern = if (Utils.isWindows) {
-// On Windows, the pattern should not start with slashes for 
absolute file paths.
-pathPattern.stripPrefix("/")
-  } else {
-pathPattern
-  }
-  val files = new File(dir).listFiles()
-  if (files == null) {
-false
-  } else {
-val matcher = fileSystem.getPathMatcher("glob:" + 
safePathPattern)
-files.exists(f => 
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
-  }
-} else {
-  new File(file.getAbsolutePath).exists()
-}
-if (!exists) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
-}
-uri
+val localFS = FileContext.getLocalFSFileContext()
+localFS.makeQualified(new Path(path))
   } else {
-val uri = new URI(path)
-val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != 
null) {
-  uri
-} else {
-  // Follow Hive's behavior:
-  // If no schema or authority is provided with non-local inpath,
-  // we will use hadoop configuration "fs.defaultFS".
-  val defaultFSConf = 
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
-  val defaultFS = if (defaultFSConf == null) {
-new URI("")
-  } else {
-new URI(defaultFSConf)
-  }
-
-  val scheme = if (uri.getScheme() != null) {
--- End diff --

Got your point Sean, Code is not exactly similar , the key difference here 
is makeQualified() will pass null as query parameter while URI construction, 
because of this the string values after '?' will not be discarded and the load 
path will remain same. this is the reason why i used this API. 
As you suggested we can extract the logic in a private API since 
makeQualified() is LimitedPrivate. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21699
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...

2018-07-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21889
  
Normally, we change the default to false or revert the whole PR if the bugs 
are found during the RC (release candidate) stage. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21928#discussion_r206411041
  
--- Diff: python/pyspark/serializers.py ---
@@ -236,6 +237,11 @@ def create_array(s, t):
 # TODO: need decode before converting to Arrow in Python 2
 return pa.Array.from_pandas(s.apply(
 lambda v: v.decode("utf-8") if isinstance(v, str) else v), 
mask=mask, type=t)
+elif t is not None and pa.types.is_decimal(t) and \
+LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < 
LooseVersion("0.10.0"):
--- End diff --

Yea, but not sure if I am aware of other issues specific to PyArrow 
versions. Will make a single place if I happen to fix things specific to 
PyArrow versions for sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...

2018-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21926


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21928#discussion_r206410816
  
--- Diff: python/pyspark/serializers.py ---
@@ -236,6 +237,11 @@ def create_array(s, t):
 # TODO: need decode before converting to Arrow in Python 2
 return pa.Array.from_pandas(s.apply(
 lambda v: v.decode("utf-8") if isinstance(v, str) else v), 
mask=mask, type=t)
+elif t is not None and pa.types.is_decimal(t) and \
+LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < 
LooseVersion("0.10.0"):
+# TODO: see ARROW-2432. Remove when the minimum PyArrow 
version becomes 0.10.0.
+return pa.Array.from_pandas(s.apply(
+lambda v: decimal.Decimal('NaN') if v is None else v), 
mask=mask, type=t)
--- End diff --

existing test should test this `test_vectorized_udf_null_decimal`. This is 
failed without the current change and PyArrow 0.9.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...

2018-07-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21889
  
We are still targeting this to 2.4, but we need to fix all the identified 
bugs before merging it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21926
  
LGTM Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21825
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93823/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21825
  
**[Test build #93823 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93823/testReport)**
 for PR 21825 at commit 
[`cf27272`](https://github.com/apache/spark/commit/cf27272bb3269075534728e4e602ac379f41d40b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21825
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...

2018-07-30 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21926#discussion_r206406546
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -529,6 +529,10 @@ class Analyzer(
 || (p.groupByExprsOpt.isDefined && 
!p.groupByExprsOpt.get.forall(_.resolved))
 || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => 
p
   case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, 
child) =>
+if (!RowOrdering.isOrderable(pivotColumn.dataType)) {
+  throw new AnalysisException(
+s"Invalid pivot column '${pivotColumn}'. Pivot columns must be 
comparable.")
--- End diff --

To the other reviewers, this is consistent with the requirements of 
group-by columns. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21825
  
**[Test build #93823 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93823/testReport)**
 for PR 21825 at commit 
[`cf27272`](https://github.com/apache/spark/commit/cf27272bb3269075534728e4e602ac379f41d40b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21825
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21825
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1515/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21926
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21926
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93818/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21926
  
**[Test build #93818 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93818/testReport)**
 for PR 21926 at commit 
[`b41a45c`](https://github.com/apache/spark/commit/b41a45cb22bd3d49e75711950bcbc3d409bc544a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...

2018-07-30 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21928#discussion_r206401496
  
--- Diff: python/pyspark/serializers.py ---
@@ -236,6 +237,11 @@ def create_array(s, t):
 # TODO: need decode before converting to Arrow in Python 2
 return pa.Array.from_pandas(s.apply(
 lambda v: v.decode("utf-8") if isinstance(v, str) else v), 
mask=mask, type=t)
+elif t is not None and pa.types.is_decimal(t) and \
+LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < 
LooseVersion("0.10.0"):
+# TODO: see ARROW-2432. Remove when the minimum PyArrow 
version becomes 0.10.0.
+return pa.Array.from_pandas(s.apply(
+lambda v: decimal.Decimal('NaN') if v is None else v), 
mask=mask, type=t)
--- End diff --

add test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...

2018-07-30 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21928#discussion_r206401644
  
--- Diff: python/pyspark/serializers.py ---
@@ -236,6 +237,11 @@ def create_array(s, t):
 # TODO: need decode before converting to Arrow in Python 2
 return pa.Array.from_pandas(s.apply(
 lambda v: v.decode("utf-8") if isinstance(v, str) else v), 
mask=mask, type=t)
+elif t is not None and pa.types.is_decimal(t) and \
+LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < 
LooseVersion("0.10.0"):
--- End diff --

consider a single place to check pyarrow versions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/21928
  
yea, it doesn't seem very useful to ping matei on every single PR ;)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-07-30 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206400571
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
 ---
@@ -119,8 +119,47 @@ object CSVBenchmarks {
 }
   }
 
+  def countBenchmark(rowsNum: Int): Unit = {
+val colsNum = 10
+val benchmark = new Benchmark(s"Count a dataset with $colsNum 
columns", rowsNum)
+
+withTempPath { path =>
+  val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", 
IntegerType))
+  val schema = StructType(fields)
+
+  spark.range(rowsNum)
+.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
+.write
+.csv(path.getAbsolutePath)
+
+  val ds = spark.read.schema(schema).csv(path.getAbsolutePath)
+
+  benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
+ds.select("*").filter((_: Row) => true).count()
+  }
+  benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
+ds.select($"col1").filter((_: Row) => true).count()
--- End diff --

does this benchmark result vary if we select `col2` or `col10`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21928
  
Yea.. this also triggered me to send an email to the mailing list - 
http://apache-spark-developers-list.1001551.n3.nabble.com/Review-notification-bot-tc24133.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/21928
  
I wonder if we could tune the bot suggestions to more recent 
contributions/contributors?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206398377
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,285 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+if (elementTypeSupportEquals) {
+  val ptName = CodeGenerator.primitiveTypeName(elementType)
+  val unsafeArray = ctx.freshName("unsafeArray")
+  val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+   getter, setter, javaTypeName, primitiveTypeName, 
arrayDataBuilder) =
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
--- End diff --

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206397708
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,285 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+if (elementTypeSupportEquals) {
+  val ptName = CodeGenerator.primitiveTypeName(elementType)
+  val unsafeArray = ctx.freshName("unsafeArray")
+  val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+   getter, setter, javaTypeName, primitiveTypeName, 
arrayDataBuilder) =
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
--- End diff --

[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...

2018-07-30 Thread mallman
Github user mallman commented on the issue:

https://github.com/apache/spark/pull/21889
  
Hi @gatorsmile. Where do you see us at this point? Do you still want to get 
this into Spark 2.4?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21807
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93817/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21807
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21807
  
**[Test build #93817 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93817/testReport)**
 for PR 21807 at commit 
[`60b9af3`](https://github.com/apache/spark/commit/60b9af3a8dbb9fe75f53ceae36e71c273a991db4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2018-07-30 Thread yucai
Github user yucai commented on the issue:

https://github.com/apache/spark/pull/19788
  
@cloud-fan @gatorsmile I am trying the new method as suggested and I have a 
question.

If we make it **purely server-side** optimization, for external shuffle 
service, it has no idea how shuffle data is compressed (concatenatable?) or 
serialized (relocatable?), how does it decide if it can merge the contiguous 
partition or not?

One possible solution is to read all contiguous partition in one shot and 
then send the data one by one, how do you think?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21918
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21918
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93816/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21918
  
**[Test build #93816 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93816/testReport)**
 for PR 21918 at commit 
[`b93d212`](https://github.com/apache/spark/commit/b93d21267d6204f25c8fabeec681d1b6e9ebffb6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206392487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

@HyukjinKwon 
Nice finding. I missed it while reviewing.

Btw, FYI, in #21469 I'm adding new field with default value in 
StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new 
ju.HashMap()` and MiMa doesn't complain.


https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52

@arunmahadevan 
Maybe `ju.Map[String, JLong]` will also work here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-07-30 Thread jiangxb1987
Github user jiangxb1987 closed the pull request at:

https://github.com/apache/spark/pull/21494


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling

2018-07-30 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21494
  
Close this in favor of #21758 and #21898 , thanks for your comments! I hope 
they're addressed in the new code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21928
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21928
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93821/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21928
  
**[Test build #93821 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93821/testReport)**
 for PR 21928 at commit 
[`652afd0`](https://github.com/apache/spark/commit/652afd0e6f156330d8b0dc28ee519605ae32e971).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206388466
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("continuous data") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 10
+val data = scala.collection.mutable.ListBuffer[Int]()
+val offsets = scala.collection.mutable.ListBuffer[Int]()
+import org.scalatest.time.SpanSugar._
+failAfter(5 seconds) {
+  // inject rows, read and check the data and offsets
--- End diff --

Maybe adding more line comments in code block would help understanding the 
test code easier, like intentionally committing in the middle of range, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206386593
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Serialization.read[List[Int]](json))
+  }
+
+  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206385714
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Serialization.read[List[Int]](json))
+  }
+
+  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206357959
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
--- End diff --

While the values are good to be placed with companion object, it looks like 
redundant to have them in both micro-batch and continuous, so might be better 
to have common object to place this. 

We may need to find more spots to deduplicate between micro-batch and 
continuous for socket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206371107
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
--- End diff --

I'd rather make it safer via either one of two approaches: 

1. assert partition offsets has all partition ids, 0 ~ numPartitions - 1
2. add partition id in list element of TextSocketOffset as 
Ra

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206388213
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("continuous data") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 10
+val data = scala.collection.mutable.ListBuffer[Int]()
+val offsets = scala.collection.mutable.ListBuffer[Int]()
+import org.scalatest.time.SpanSugar._
+failAfter(5 seconds) {
+  // inject rows, read and check the data and offsets
+  for (i <- 0 until numRecords) {
+serverThread.enqueue(i.toString)
+  }
+  tasks.asScala.foreach {
+case t: TextSocketContinuousInputPartition =>
+  val r = 
t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader]
+  for (i <- 0 until numRecords / 2) {
+r.next()
+
offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
+data.append(r.get().getString(0).toInt)
+if (i == 2) {
+  commitOffset(t.partitionId, i + 1)
+}
+  }
+  assert(offsets.toSeq == Range.inclusive(1, 5))
+  assert(data.toSeq == Range(t.partitionId, 10, 2))
+  offsets.clear()
+  data.clear()
+case _ => throw new IllegalStateException("Unexpected task type")
+  }
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(3, 3))
+  reader.commit(TextSocketOffset(List(5, 5)))
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== List(5, 5))
+}
+
+def commitOffset(partition: Int, offset: Int): Unit = {
+  val offsetsToCommit = 
reader.getStartOffset.asInstanceOf[TextSocketOffset]
+.offsets.updated(partition, offset)
+  reader.commit(TextSocketOffset(offsetsToCommit))
+  assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets 
== offsetsToCommit)
+}
+  }
+
+  test("continuous data - invalid commit") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5
+// ok to commit same offset
+reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5
+assertThrows[IllegalStateException] {
+  reader.commit(TextSocketOffset(List(6, 6)))
+}
+  }
+
+  test("continuous data with timestamp") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val reader = new TextSocketContinuousReader(
+  new DataSourceOptions(Map("numPartitions" -> "2", "host" -> 
"localhost",
+"includeTimestamp" -> "true",
+"port" -> serverThread.port.toString).asJava))
+reader.setStartOffset(Optional.empty())
+val tasks = reader.planRowInputPartitions()
+assert(tasks.size == 2)
+
+val numRecords = 4
+import org.apache.spark.sql.Row
--- End diff --

Looks like unused import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r206384495
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Serialization.read[List[Int]](json))
+  }
+
+  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {

[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21103
  
**[Test build #93822 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93822/testReport)**
 for PR 21103 at commit 
[`49b5ab3`](https://github.com/apache/spark/commit/49b5ab371af9783be8f2d6351cf664a769957a4e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21103
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21103
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1514/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21928
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21928
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1513/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21928
  
**[Test build #93821 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93821/testReport)**
 for PR 21928 at commit 
[`652afd0`](https://github.com/apache/spark/commit/652afd0e6f156330d8b0dc28ee519605ae32e971).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread holdensmagicalunicorn
Github user holdensmagicalunicorn commented on the issue:

https://github.com/apache/spark/pull/21928
  
@HyukjinKwon, thanks! I am a bot who has found some folks who might be able 
to help with the review:@gatorsmile, @JoshRosen and @mateiz


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21928
  
cc @ueshin, @icexelloss and @BryanCutler 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...

2018-07-30 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/21928

[SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to 
Arrow 0.9.0)

## What changes were proposed in this pull request?

See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems 
using `from_pandas` to convert decimals fails if encounters a value of `None`:

**Arrow 0.8.0**

```python
import pyarrow as pa
import pandas as pd
from decimal import Decimal

pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), 
type=pa.decimal128(3, 2))
```

```

[
  Decimal('3.14'),
  NA
]
```

**Arrow 0.9.0**

```
Traceback (most recent call last):
  File "", line 1, in 
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: 
Got Python object of type NoneType but can only handle these types: 
decimal.Decimal
```

This PR propose to work around this via Decimal NaN:

```python
pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), 
type=pa.decimal128(3, 2))
```

```

[
  Decimal('3.14'),
  NA
]
```

## How was this patch tested?

Manually tested:

```bash
SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests
```

**Before**

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/tests.py", line 4672, in 
test_vectorized_udf_null_decimal
self.assertEquals(df.collect(), res.collect())
  File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect
sock_info = self._jdf.collectToPython()
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o51.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 
in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 
(TID 7, localhost, executor driver): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/pyspark/worker.py", line 320, in main
process()
  File "/.../spark/python/pyspark/worker.py", line 315, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream
batch = _create_batch(series, self._timezone)
  File "/.../spark/python/pyspark/serializers.py", line 243, in 
_create_batch
arrs = [create_array(s, t) for s, t in series]
  File "/.../spark/python/pyspark/serializers.py", line 241, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
  File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas
  File "array.pxi", line 177, in pyarrow.lib.array
  File "error.pxi", line 77, in pyarrow.lib.check_status
  File "error.pxi", line 77, in pyarrow.lib.check_status
ArrowInvalid: Error converting from Python objects to Decimal: Got Python 
object of type NoneType but can only handle these types: decimal.Decimal
```

**After**

```
Running tests...
--
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
...S.
--
Ran 37 tests in 21.980s
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark SPARK-24976

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21928


commit 652afd0e6f156330d8b0dc28ee519605ae32e971
Author: hyukjinkwon 
Date:   2018-07-31T03:37:43Z

Allow None for Decimal type conversion (specific to Arrow 0.9.0)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.ap

[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21927
  
**[Test build #93820 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93820/testReport)**
 for PR 21927 at commit 
[`0733bfb`](https://github.com/apache/spark/commit/0733bfb06c8641969a70f59a3f8c5b2e4c7a5eca).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21927
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21927
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1512/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...

2018-07-30 Thread holdensmagicalunicorn
Github user holdensmagicalunicorn commented on the issue:

https://github.com/apache/spark/pull/21927
  
@jiangxb1987, thanks! I am a bot who has found some folks who might be able 
to help with the review:@squito, @mateiz and @rxin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21927: [SPARK-24820][Core] Fail fast when submitted job ...

2018-07-30 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/21927

[SPARK-24820][Core] Fail fast when submitted job contains 
PartitionPruningRDD in a barrier stage

## What changes were proposed in this pull request?

`PartitionPruningRDD` may launch tasks on partial partitions, we shall 
check on job submit to make sure we are not launching a barrier stage that 
contains PartitionPruningRDD, otherwise shall fail fast.

## How was this patch tested?

Add test cases in `BarrierStageOnSubmittedSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark SPARK-24820

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21927.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21927


commit 0733bfb06c8641969a70f59a3f8c5b2e4c7a5eca
Author: Xingbo Jiang 
Date:   2018-07-31T03:27:33Z

Fail fast when submitted job contains PartitionPruningRDD in a barrier stage




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21103
  
**[Test build #93819 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93819/testReport)**
 for PR 21103 at commit 
[`4d943c8`](https://github.com/apache/spark/commit/4d943c842548914ab151a7a15fc9e0f8743f0caf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21103
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1511/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21103
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...

2018-07-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21916#discussion_r206383262
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.io.FileNotFoundException
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+class ProcfsBasedSystems  extends ProcessTreeMetrics {
+  val procfsDir = "/proc/"
+  var isAvailable: Boolean = isItProcfsBased
+  val pid: Int = computePid()
+  val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
+scala.collection.mutable.Map[ Int, Set[Int]]()
+  val PROCFS_STAT_FILE = "stat"
+
+
+  def isItProcfsBased: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+true
+  }
+
+
+  def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+val cmd = Array("bash", "-c", "echo $PPID")
+val length = 10
+var out: Array[Byte] = Array.fill[Byte](length)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+return pid;
+  }
+
+
+  def createProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue: Queue[Int] = new Queue[Int]()
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPIds(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
+  }
+  else {
+ptree += (p -> Set[Int]())
+  }
+}
+  }
+
+
+  def updateProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue: Queue[Int] = new Queue[Int]()
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPIds(p)
+  if(!c.isEmpty) {
+queue ++= c
+val preChildren = ptree.get(p)
+preChildren match {
+  case Some(children) => if (!c.toSet.equals(children)) {
+val diff: Set[Int] = children -- c.toSet
+ptree.update(p, c.toSet )
+diff.foreach(ptree.remove(_))
+  }
+  case None => ptree.update(p, c.toSet )
+}
+  }
+  else {
+ptree.update(p, Set[Int]())
+  }
+}
+  }
+
+
+  /**
+   * Hadoop ProcfsBasedProcessTree class used regex and pattern matching 
to retrive the memory
+   * info. I tried that but found it not correct during tests, so I used 
normal string analysis
+   * instead. The computation of RSS and Vmem are based on proc(5):
+   * http://man7.org/linux/man-pages/man5/proc.5.html
+   */
+  def getProcessInfo(pid: Int): String = {
+try {
+  val pidDir: File = new File(procfsDir, pid.toString)
+  val fReader = new InputStreamReader(
+new FileInputStream(
+  new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8"))
+  val in: BufferedReader = new BufferedReader(fReader)
+  val procInfo = in.readLine
--- End diff --

This is what hadoop's ProcfsBasedProcessTree is doing as well. I wasn't 
able to find a reference, but

[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17185#discussion_r206382570
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 ---
@@ -120,22 +120,54 @@ abstract class LogicalPlan
   /**
* Resolve the given `name` string against the given attribute, 
returning either 0 or 1 match.
--- End diff --

can you briefly explain the new resolution logic? I feel it's a little 
convoluted now as we are more likely to be ambiguous.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17185#discussion_r206381686
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -121,14 +129,14 @@ abstract class Attribute extends LeafExpression with 
NamedExpression with NullIn
  * @param name The name to be associated with the result of computing 
[[child]].
  * @param exprId A globally unique id used to check if an 
[[AttributeReference]] refers to this
  *   alias. Auto-assigned if left blank.
- * @param qualifier An optional string that can be used to referred to 
this attribute in a fully
- *   qualified way. Consider the examples tableName.name, 
subQueryAlias.name.
- *   tableName and subQueryAlias are possible qualifiers.
+ * @param qualifier An optional Seq of string that can be used to refer to 
this attribute in a
+ *  fully qualified way. Consider the examples 
tableName.name, subQueryAlias.name.
+ *  tableName and subQueryAlias are possible qualifiers.
  * @param explicitMetadata Explicit metadata associated with this alias 
that overwrites child's.
  */
 case class Alias(child: Expression, name: String)(
 val exprId: ExprId = NamedExpression.newExprId,
-val qualifier: Option[String] = None,
+val qualifier: Option[Seq[String]] = None,
--- End diff --

Again, I feel using `Seq[String]` directly can simplify a lot of code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17185#discussion_r206381459
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -654,16 +654,19 @@ class SessionCatalog(
*
* If the relation is a view, we generate a [[View]] operator from the 
view description, and
* wrap the logical plan in a [[SubqueryAlias]] which will track the 
name of the view.
+   * [[SubqueryAlias]] will also keep track of the name and 
database(optional) of the table/view
*
* @param name The name of the table/view that we look up.
*/
   def lookupRelation(name: TableIdentifier): LogicalPlan = {
 synchronized {
   val db = formatDatabaseName(name.database.getOrElse(currentDb))
   val table = formatTableName(name.table)
+  // To keep track of the name and database of the table/view
+  val alias = AliasIdentifier(table, Some(db))
   if (db == globalTempViewManager.database) {
 globalTempViewManager.get(table).map { viewDef =>
-  SubqueryAlias(table, viewDef)
+  SubqueryAlias(alias, viewDef)
--- End diff --

I think we can make `SubqueryAlias` take a `qualifier: Seq[String]` instead 
of `alias: String`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17185#discussion_r206380665
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 ---
@@ -262,17 +262,47 @@ abstract class Star extends LeafExpression with 
NamedExpression {
  */
 case class UnresolvedStar(target: Option[Seq[String]]) extends Star with 
Unevaluable {
 
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+  /**
+   * Returns true if the nameParts match the qualifier of the attribute
+   *
+   * There are two checks: i) Check if the nameParts match the qualifier 
fully.
+   * E.g. SELECT db.t1.* FROM db1.t1   In this case, the nameParts is 
Seq("db1", "t1") and
--- End diff --

what about `SELECT db1.t1.* FROM t1` while the current database is `db1`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21669
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21669
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93807/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21669
  
**[Test build #93807 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93807/testReport)**
 for PR 21669 at commit 
[`0939738`](https://github.com/apache/spark/commit/0939738e2d7b18652055926be3ed7fbba2df3f72).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21923
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93806/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21923
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21923
  
**[Test build #93806 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93806/testReport)**
 for PR 21923 at commit 
[`ba6aa6c`](https://github.com/apache/spark/commit/ba6aa6c829bfcca1b4b3d5a33fe3a7460e7db1f0).
 * This patch **fails from timeout after a configured wait of \`300m\`**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public class AbstractExecutorPlugin `
  * `  .doc(\"Comma-separated list of class names for \"plugins\" 
implementing \" +`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-30 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
@zsxwing 
Is there anything I should do for this PR ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21881: [SPARK-24930][SQL] Improve exception information ...

2018-07-30 Thread ouyangxiaochen
Github user ouyangxiaochen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21881#discussion_r206377644
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -337,7 +337,11 @@ case class LoadDataCommand(
   new File(file.getAbsolutePath).exists()
 }
 if (!exists) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
+  // If user have no permission to access the given input path, 
`File.exists()` return false
+  // , `LOAD DATA input path does not exist` can confuse users.
+  throw new AnalysisException(s"LOAD DATA input path does not 
exist: `$path` or current " +
--- End diff --

OK, Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21881: [SPARK-24930][SQL] Improve exception information when us...

2018-07-30 Thread ouyangxiaochen
Github user ouyangxiaochen commented on the issue:

https://github.com/apache/spark/pull/21881
  
@gatorsmile Hi, i am not sure how to build this scene in test case, just 
assert if the exception info contains the key message `have no permission to 
access the input path`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...

2018-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21887


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...

2018-07-30 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/21834
  
Thanks for the merge! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21887
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17185
  
Build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17185
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93810/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21926
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17185
  
**[Test build #93810 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93810/testReport)**
 for PR 17185 at commit 
[`8206dc3`](https://github.com/apache/spark/commit/8206dc3bc2595507ba71e7f50fddeed0c3b16479).
 * This patch **fails Spark unit tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21926
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1510/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...

2018-07-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21916#discussion_r206376041
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +181,34 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
+
+  /**
+   *  Off heap execution memory currently in use, in bytes.
+   */
+  final def offHeapExecutionMemoryUsed: Long = 
offHeapExecutionMemoryPool.memoryUsed
+
+  /**
+   *  On heap storage memory currently in use, in bytes.
+   */
+  final def onHeapStorageMemoryUsed: Long = 
onHeapStorageMemoryPool.memoryUsed
+
+  /**
+   *  Off heap storage memory currently in use, in bytes.
+   */
+  final def offHeapStorageMemoryUsed: Long = 
offHeapStorageMemoryPool.memoryUsed
+
+  /**
+   *  If the system isn't procfsBased the process tree metrics' values 
will be -1,
+   *  meaning not available
+   */
+  final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems
+  if (pTreeInfo.isAvailable) {
--- End diff --

I will change this as well. It is a final val since other metrics were also 
final. Will check the lazy val, but probably not much difference since this 
initiation will be executed just one time anyway. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21926
  
**[Test build #93818 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93818/testReport)**
 for PR 21926 at commit 
[`b41a45c`](https://github.com/apache/spark/commit/b41a45cb22bd3d49e75711950bcbc3d409bc544a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...

2018-07-30 Thread maryannxue
Github user maryannxue commented on the issue:

https://github.com/apache/spark/pull/21926
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...

2018-07-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21916#discussion_r206375088
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.io.FileNotFoundException
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+class ProcfsBasedSystems  extends ProcessTreeMetrics {
+  val procfsDir = "/proc/"
+  var isAvailable: Boolean = isItProcfsBased
+  val pid: Int = computePid()
+  val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
+scala.collection.mutable.Map[ Int, Set[Int]]()
+  val PROCFS_STAT_FILE = "stat"
+
+
+  def isItProcfsBased: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+true
+  }
+
+
+  def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+val cmd = Array("bash", "-c", "echo $PPID")
+val length = 10
+var out: Array[Byte] = Array.fill[Byte](length)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+return pid;
+  }
+
+
+  def createProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue: Queue[Int] = new Queue[Int]()
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPIds(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
+  }
+  else {
+ptree += (p -> Set[Int]())
+  }
+}
+  }
+
+
+  def updateProcessTree(): Unit = {
--- End diff --

Thanks @ankuriitg for the review. I will apply your comments ASAP. For this 
one  I may do some other improvements before just creating the process tree 
each time. I understand in this version updating looks more complex than just 
recreating it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...

2018-07-30 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/21916#discussion_r206375077
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.io.FileNotFoundException
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.Queue
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
+// project.
+class ProcfsBasedSystems  extends ProcessTreeMetrics {
+  val procfsDir = "/proc/"
+  var isAvailable: Boolean = isItProcfsBased
+  val pid: Int = computePid()
+  val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
+scala.collection.mutable.Map[ Int, Set[Int]]()
+  val PROCFS_STAT_FILE = "stat"
+
+
+  def isItProcfsBased: Boolean = {
+val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
+if (testing) {
+  return true
+}
+try {
+  if (!Files.exists(Paths.get(procfsDir))) {
+return false
+  }
+}
+catch {
+  case f: FileNotFoundException => return false
+}
+true
+  }
+
+
+  def computePid(): Int = {
+if (!isAvailable) {
+  return -1;
+}
+val cmd = Array("bash", "-c", "echo $PPID")
+val length = 10
+var out: Array[Byte] = Array.fill[Byte](length)(0)
+Runtime.getRuntime.exec(cmd).getInputStream.read(out)
+val pid = Integer.parseInt(new String(out, "UTF-8").trim)
+return pid;
+  }
+
+
+  def createProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue: Queue[Int] = new Queue[Int]()
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPIds(p)
+  if(!c.isEmpty) {
+queue ++= c
+ptree += (p -> c.toSet)
+  }
+  else {
+ptree += (p -> Set[Int]())
+  }
+}
+  }
+
+
+  def updateProcessTree(): Unit = {
+if (!isAvailable) {
+  return
+}
+val queue: Queue[Int] = new Queue[Int]()
+queue += pid
+while( !queue.isEmpty ) {
+  val p = queue.dequeue()
+  val c = getChildPIds(p)
+  if(!c.isEmpty) {
+queue ++= c
+val preChildren = ptree.get(p)
+preChildren match {
+  case Some(children) => if (!c.toSet.equals(children)) {
+val diff: Set[Int] = children -- c.toSet
+ptree.update(p, c.toSet )
+diff.foreach(ptree.remove(_))
+  }
+  case None => ptree.update(p, c.toSet )
+}
+  }
+  else {
+ptree.update(p, Set[Int]())
+  }
+}
+  }
+
+
+  /**
+   * Hadoop ProcfsBasedProcessTree class used regex and pattern matching 
to retrive the memory
+   * info. I tried that but found it not correct during tests, so I used 
normal string analysis
+   * instead. The computation of RSS and Vmem are based on proc(5):
+   * http://man7.org/linux/man-pages/man5/proc.5.html
+   */
+  def getProcessInfo(pid: Int): String = {
+try {
+  val pidDir: File = new File(procfsDir, pid.toString)
+  val fReader = new InputStreamReader(
+new FileInputStream(
+  new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8"))
+  val in: BufferedReader = new BufferedReader(fReader)
+  val procInfo = in.readLine
+  in.close
+  fReader.close
+  return procInfo
+} catch {
+  case f: FileNotFoundException

[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206374910
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Wait .. this is an exposed API, right? I guess this is exposed to Java API 
too (for instance `query.lastProgress().sources()`)? In that case, we should 
avoid Scala's Option and `org.json4s.*`. If this is supposed to be hidden here, 
I think we should better find a way to hide this with package level access 
modifier.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206374120
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for reporting custom metrics from streaming sources and 
sinks
+ */
+@InterfaceStability.Evolving
+public interface CustomMetrics {
--- End diff --

Java side should also be 2 spaced indented (see "Code Style Guide" in 
https://spark.apache.org/contributing.html)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206373655
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +151,46 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => Option[CustomMetrics],
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
--- End diff --

nit:

```scala
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[JValue] = {
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206373303
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Default value does not work in Java API and probably MiMa complains about 
this. I think another constructor should better be made instead of default 
value to work around this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20028: [SPARK-19053][ML]Supporting multiple evaluation metrics ...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20028
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1509/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20028: [SPARK-19053][ML]Supporting multiple evaluation metrics ...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20028
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...

2018-07-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20010
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >