[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21807 @mauropalsgraaf Could you fix the PR title? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21699 **[Test build #93824 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93824/testReport)** for PR 21699 at commit [`34535a9`](https://github.com/apache/spark/commit/34535a9cc5ec7a2ba880f7f525feb7dbbc0b0c37). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user sujith71955 commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206411960 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { --- End diff -- Got your point Sean, Code is not exactly similar , the key difference here is makeQualified() will pass null as query parameter while URI construction, because of this the string values after '?' will not be discarded and the load path will remain same. this is the reason why i used this API. As you suggested we can extract the logic in a private API since makeQualified() is LimitedPrivate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21699 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21889 Normally, we change the default to false or revert the whole PR if the bugs are found during the RC (release candidate) stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21928#discussion_r206411041 --- Diff: python/pyspark/serializers.py --- @@ -236,6 +237,11 @@ def create_array(s, t): # TODO: need decode before converting to Arrow in Python 2 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) +elif t is not None and pa.types.is_decimal(t) and \ +LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): --- End diff -- Yea, but not sure if I am aware of other issues specific to PyArrow versions. Will make a single place if I happen to fix things specific to PyArrow versions for sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21926 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21928#discussion_r206410816 --- Diff: python/pyspark/serializers.py --- @@ -236,6 +237,11 @@ def create_array(s, t): # TODO: need decode before converting to Arrow in Python 2 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) +elif t is not None and pa.types.is_decimal(t) and \ +LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): +# TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. +return pa.Array.from_pandas(s.apply( +lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) --- End diff -- existing test should test this `test_vectorized_udf_null_decimal`. This is failed without the current change and PyArrow 0.9.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21889 We are still targeting this to 2.4, but we need to fix all the identified bugs before merging it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21926 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21825 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93823/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21825 **[Test build #93823 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93823/testReport)** for PR 21825 at commit [`cf27272`](https://github.com/apache/spark/commit/cf27272bb3269075534728e4e602ac379f41d40b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21825 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21926#discussion_r206406546 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -529,6 +529,10 @@ class Analyzer( || (p.groupByExprsOpt.isDefined && !p.groupByExprsOpt.get.forall(_.resolved)) || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) => +if (!RowOrdering.isOrderable(pivotColumn.dataType)) { + throw new AnalysisException( +s"Invalid pivot column '${pivotColumn}'. Pivot columns must be comparable.") --- End diff -- To the other reviewers, this is consistent with the requirements of group-by columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21825 **[Test build #93823 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93823/testReport)** for PR 21825 at commit [`cf27272`](https://github.com/apache/spark/commit/cf27272bb3269075534728e4e602ac379f41d40b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21825 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21825: [SPARK-18188][DOC][FOLLOW-UP]Add `spark.broadcast.checks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21825 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1515/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21926 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21926 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93818/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21926 **[Test build #93818 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93818/testReport)** for PR 21926 at commit [`b41a45c`](https://github.com/apache/spark/commit/b41a45cb22bd3d49e75711950bcbc3d409bc544a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21928#discussion_r206401496 --- Diff: python/pyspark/serializers.py --- @@ -236,6 +237,11 @@ def create_array(s, t): # TODO: need decode before converting to Arrow in Python 2 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) +elif t is not None and pa.types.is_decimal(t) and \ +LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): +# TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. +return pa.Array.from_pandas(s.apply( +lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) --- End diff -- add test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21928#discussion_r206401644 --- Diff: python/pyspark/serializers.py --- @@ -236,6 +237,11 @@ def create_array(s, t): # TODO: need decode before converting to Arrow in Python 2 return pa.Array.from_pandas(s.apply( lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) +elif t is not None and pa.types.is_decimal(t) and \ +LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): --- End diff -- consider a single place to check pyarrow versions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21928 yea, it doesn't seem very useful to ping matei on every single PR ;) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206400571 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala --- @@ -119,8 +119,47 @@ object CSVBenchmarks { } } + def countBenchmark(rowsNum: Int): Unit = { +val colsNum = 10 +val benchmark = new Benchmark(s"Count a dataset with $colsNum columns", rowsNum) + +withTempPath { path => + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) + val schema = StructType(fields) + + spark.range(rowsNum) +.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) +.write +.csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => +ds.select("*").filter((_: Row) => true).count() + } + benchmark.addCase(s"Select 1 column + count()", 3) { _ => +ds.select($"col1").filter((_: Row) => true).count() --- End diff -- does this benchmark result vary if we select `col2` or `col10`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21928 Yea.. this also triggered me to send an email to the mailing list - http://apache-spark-developers-list.1001551.n3.nabble.com/Review-notification-bot-tc24133.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21928 I wonder if we could tune the bot suggestions to more recent contributions/contributors? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r206398377 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,285 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +left.dataType + } + + @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val elem = array2.get(i, elementType) +hs.add(elem) + } + i += 1 +} +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + arrayBuffer += null + notFoundNullElement = false +} + } else { +val elem = array1.get(i, elementType) +if (!hs.contains(elem)) { + arrayBuffer += elem + hs.add(elem) +} + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} else { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +var scannedNullElements = false +var i = 0 +while (i < array1.numElements()) { + var found = false + val elem1 = array1.get(i, elementType) + if (elem1 == null) { +if (!scannedNullElements) { + var j = 0 + while (!found && j < array2.numElements()) { +found = array2.isNullAt(j) +j += 1 + } + // array2 is scanned only once for null element + scannedNullElements = true +} else { + found = true +} + } else { +var j = 0 +while (!found && j < array2.numElements()) { + val elem2 = array2.get(j, elementType) + if (elem2 != null) { +found = ordering.equiv(elem1, elem2) + } + j += 1 +} +if (!found) { + // check whether elem1 is already stored in arrayBuffer + var k = 0 + while (!found && k < arrayBuffer.size) { +val va = arrayBuffer(k) +found = (va != null) && ordering.equiv(va, elem1) +k += 1 + } +} + } + if (!found) { +arrayBuffer += elem1 + } + i += 1 +} +new GenericArrayData(arrayBuffer) + } + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalExcept(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val arrayData = classOf[ArrayData].getName +val i = ctx.freshName("i") +val pos = ctx.freshName("pos") +val value = ctx.freshName("value") +val hsValue = ctx.freshName("hsValue") +val size = ctx.freshName("size") +if (elementTypeSupportEquals) { + val ptName = CodeGenerator.primitiveTypeName(elementType) + val unsafeArray = ctx.freshName("unsafeArray") + val (postFix, openHashElementType, hsJavaTypeName, genHsValue, + getter, setter, javaTypeName, primitiveTypeName, arrayDataBuilder) = +elementType match { + case BooleanType | ByteType | ShortType | IntegerType => --- End diff --
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r206397708 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,285 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +left.dataType + } + + @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val elem = array2.get(i, elementType) +hs.add(elem) + } + i += 1 +} +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + arrayBuffer += null + notFoundNullElement = false +} + } else { +val elem = array1.get(i, elementType) +if (!hs.contains(elem)) { + arrayBuffer += elem + hs.add(elem) +} + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} else { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +var scannedNullElements = false +var i = 0 +while (i < array1.numElements()) { + var found = false + val elem1 = array1.get(i, elementType) + if (elem1 == null) { +if (!scannedNullElements) { + var j = 0 + while (!found && j < array2.numElements()) { +found = array2.isNullAt(j) +j += 1 + } + // array2 is scanned only once for null element + scannedNullElements = true +} else { + found = true +} + } else { +var j = 0 +while (!found && j < array2.numElements()) { + val elem2 = array2.get(j, elementType) + if (elem2 != null) { +found = ordering.equiv(elem1, elem2) + } + j += 1 +} +if (!found) { + // check whether elem1 is already stored in arrayBuffer + var k = 0 + while (!found && k < arrayBuffer.size) { +val va = arrayBuffer(k) +found = (va != null) && ordering.equiv(va, elem1) +k += 1 + } +} + } + if (!found) { +arrayBuffer += elem1 + } + i += 1 +} +new GenericArrayData(arrayBuffer) + } + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalExcept(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val arrayData = classOf[ArrayData].getName +val i = ctx.freshName("i") +val pos = ctx.freshName("pos") +val value = ctx.freshName("value") +val hsValue = ctx.freshName("hsValue") +val size = ctx.freshName("size") +if (elementTypeSupportEquals) { + val ptName = CodeGenerator.primitiveTypeName(elementType) + val unsafeArray = ctx.freshName("unsafeArray") + val (postFix, openHashElementType, hsJavaTypeName, genHsValue, + getter, setter, javaTypeName, primitiveTypeName, arrayDataBuilder) = +elementType match { + case BooleanType | ByteType | ShortType | IntegerType => --- End diff --
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Hi @gatorsmile. Where do you see us at this point? Do you still want to get this into Spark 2.4? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21807 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93817/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21807 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21807: [SPARK-24536] Validate that limit clause cannot have a n...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21807 **[Test build #93817 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93817/testReport)** for PR 21807 at commit [`60b9af3`](https://github.com/apache/spark/commit/60b9af3a8dbb9fe75f53ceae36e71c273a991db4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user yucai commented on the issue: https://github.com/apache/spark/pull/19788 @cloud-fan @gatorsmile I am trying the new method as suggested and I have a question. If we make it **purely server-side** optimization, for external shuffle service, it has no idea how shuffle data is compressed (concatenatable?) or serialized (relocatable?), how does it decide if it can merge the contiguous partition or not? One possible solution is to read all contiguous partition in one shot and then send the data one by one, how do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21918 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21918 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93816/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21918: [SPARK-24821][Core] Fail fast when submitted job compute...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21918 **[Test build #93816 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93816/testReport)** for PR 21918 at commit [`b93d212`](https://github.com/apache/spark/commit/b93d21267d6204f25c8fabeec681d1b6e9ebffb6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206392487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- @HyukjinKwon Nice finding. I missed it while reviewing. Btw, FYI, in #21469 I'm adding new field with default value in StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new ju.HashMap()` and MiMa doesn't complain. https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52 @arunmahadevan Maybe `ju.Map[String, JLong]` will also work here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user jiangxb1987 closed the pull request at: https://github.com/apache/spark/pull/21494 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21494 Close this in favor of #21758 and #21898 , thanks for your comments! I hope they're addressed in the new code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21928 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21928 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93821/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21928 **[Test build #93821 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93821/testReport)** for PR 21928 at commit [`652afd0`](https://github.com/apache/spark/commit/652afd0e6f156330d8b0dc28ee519605ae32e971). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206388466 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("continuous data") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 10 +val data = scala.collection.mutable.ListBuffer[Int]() +val offsets = scala.collection.mutable.ListBuffer[Int]() +import org.scalatest.time.SpanSugar._ +failAfter(5 seconds) { + // inject rows, read and check the data and offsets --- End diff -- Maybe adding more line comments in code block would help understanding the test code easier, like intentionally committing in the middle of range, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206386593 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Serialization.read[List[Int]](json)) + } + + override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206385714 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Serialization.read[List[Int]](json)) + } + + override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206357959 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { --- End diff -- While the values are good to be placed with companion object, it looks like redundant to have them in both micro-batch and continuous, so might be better to have common object to place this. We may need to find more spots to deduplicate between micro-batch and continuous for socket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206371107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) --- End diff -- I'd rather make it safer via either one of two approaches: 1. assert partition offsets has all partition ids, 0 ~ numPartitions - 1 2. add partition id in list element of TextSocketOffset as Ra
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206388213 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -300,6 +301,100 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("continuous data") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 10 +val data = scala.collection.mutable.ListBuffer[Int]() +val offsets = scala.collection.mutable.ListBuffer[Int]() +import org.scalatest.time.SpanSugar._ +failAfter(5 seconds) { + // inject rows, read and check the data and offsets + for (i <- 0 until numRecords) { +serverThread.enqueue(i.toString) + } + tasks.asScala.foreach { +case t: TextSocketContinuousInputPartition => + val r = t.createPartitionReader().asInstanceOf[TextSocketContinuousInputPartitionReader] + for (i <- 0 until numRecords / 2) { +r.next() + offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset) +data.append(r.get().getString(0).toInt) +if (i == 2) { + commitOffset(t.partitionId, i + 1) +} + } + assert(offsets.toSeq == Range.inclusive(1, 5)) + assert(data.toSeq == Range(t.partitionId, 10, 2)) + offsets.clear() + data.clear() +case _ => throw new IllegalStateException("Unexpected task type") + } + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(3, 3)) + reader.commit(TextSocketOffset(List(5, 5))) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == List(5, 5)) +} + +def commitOffset(partition: Int, offset: Int): Unit = { + val offsetsToCommit = reader.getStartOffset.asInstanceOf[TextSocketOffset] +.offsets.updated(partition, offset) + reader.commit(TextSocketOffset(offsetsToCommit)) + assert(reader.getStartOffset.asInstanceOf[TextSocketOffset].offsets == offsetsToCommit) +} + } + + test("continuous data - invalid commit") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5 +// ok to commit same offset +reader.setStartOffset(Optional.of(TextSocketOffset(List(5, 5 +assertThrows[IllegalStateException] { + reader.commit(TextSocketOffset(List(6, 6))) +} + } + + test("continuous data with timestamp") { +serverThread = new ServerThread() +serverThread.start() + +val reader = new TextSocketContinuousReader( + new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost", +"includeTimestamp" -> "true", +"port" -> serverThread.port.toString).asJava)) +reader.setStartOffset(Optional.empty()) +val tasks = reader.planRowInputPartitions() +assert(tasks.size == 2) + +val numRecords = 4 +import org.apache.spark.sql.Row --- End diff -- Looks like unused import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r206384495 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Serialization.read[List[Int]](json)) + } + + override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21103 **[Test build #93822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93822/testReport)** for PR 21103 at commit [`49b5ab3`](https://github.com/apache/spark/commit/49b5ab371af9783be8f2d6351cf664a769957a4e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21103 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21103 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1514/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21928 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21928 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1513/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21928 **[Test build #93821 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93821/testReport)** for PR 21928 at commit [`652afd0`](https://github.com/apache/spark/commit/652afd0e6f156330d8b0dc28ee519605ae32e971). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user holdensmagicalunicorn commented on the issue: https://github.com/apache/spark/pull/21928 @HyukjinKwon, thanks! I am a bot who has found some folks who might be able to help with the review:@gatorsmile, @JoshRosen and @mateiz --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21928: [SPARK-24976][PYTHON] Allow None for Decimal type conver...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21928 cc @ueshin, @icexelloss and @BryanCutler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21928: [SPARK-24976][PYTHON] Allow None for Decimal type...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/21928 [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to Arrow 0.9.0) ## What changes were proposed in this pull request? See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using `from_pandas` to convert decimals fails if encounters a value of `None`: **Arrow 0.8.0** ```python import pyarrow as pa import pandas as pd from decimal import Decimal pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 2)) ``` ``` [ Decimal('3.14'), NA ] ``` **Arrow 0.9.0** ``` Traceback (most recent call last): File "", line 1, in File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` This PR propose to work around this via Decimal NaN: ```python pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), type=pa.decimal128(3, 2)) ``` ``` [ Decimal('3.14'), NA ] ``` ## How was this patch tested? Manually tested: ```bash SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests ``` **Before** ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests.py", line 4672, in test_vectorized_udf_null_decimal self.assertEquals(df.collect(), res.collect()) File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect sock_info = self._jdf.collectToPython() File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o51.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/pyspark/worker.py", line 320, in main process() File "/.../spark/python/pyspark/worker.py", line 315, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream batch = _create_batch(series, self._timezone) File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch arrs = [create_array(s, t) for s, t in series] File "/.../spark/python/pyspark/serializers.py", line 241, in create_array return pa.Array.from_pandas(s, mask=mask, type=t) File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` **After** ``` Running tests... -- Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ...S. -- Ran 37 tests in 21.980s ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-24976 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21928.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21928 commit 652afd0e6f156330d8b0dc28ee519605ae32e971 Author: hyukjinkwon Date: 2018-07-31T03:37:43Z Allow None for Decimal type conversion (specific to Arrow 0.9.0) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.ap
[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21927 **[Test build #93820 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93820/testReport)** for PR 21927 at commit [`0733bfb`](https://github.com/apache/spark/commit/0733bfb06c8641969a70f59a3f8c5b2e4c7a5eca). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21927 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21927 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1512/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21927: [SPARK-24820][Core] Fail fast when submitted job contain...
Github user holdensmagicalunicorn commented on the issue: https://github.com/apache/spark/pull/21927 @jiangxb1987, thanks! I am a bot who has found some folks who might be able to help with the review:@squito, @mateiz and @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21927: [SPARK-24820][Core] Fail fast when submitted job ...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/21927 [SPARK-24820][Core] Fail fast when submitted job contains PartitionPruningRDD in a barrier stage ## What changes were proposed in this pull request? `PartitionPruningRDD` may launch tasks on partial partitions, we shall check on job submit to make sure we are not launching a barrier stage that contains PartitionPruningRDD, otherwise shall fail fast. ## How was this patch tested? Add test cases in `BarrierStageOnSubmittedSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark SPARK-24820 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21927.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21927 commit 0733bfb06c8641969a70f59a3f8c5b2e4c7a5eca Author: Xingbo Jiang Date: 2018-07-31T03:27:33Z Fail fast when submitted job contains PartitionPruningRDD in a barrier stage --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21103 **[Test build #93819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93819/testReport)** for PR 21103 at commit [`4d943c8`](https://github.com/apache/spark/commit/4d943c842548914ab151a7a15fc9e0f8743f0caf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21103 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1511/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21103 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21916#discussion_r206383262 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.io.FileNotFoundException +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +class ProcfsBasedSystems extends ProcessTreeMetrics { + val procfsDir = "/proc/" + var isAvailable: Boolean = isItProcfsBased + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = +scala.collection.mutable.Map[ Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + + + def isItProcfsBased: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +true + } + + + def computePid(): Int = { +if (!isAvailable) { + return -1; +} +val cmd = Array("bash", "-c", "echo $PPID") +val length = 10 +var out: Array[Byte] = Array.fill[Byte](length)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +val pid = Integer.parseInt(new String(out, "UTF-8").trim) +return pid; + } + + + def createProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue: Queue[Int] = new Queue[Int]() +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + + def updateProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue: Queue[Int] = new Queue[Int]() +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { +queue ++= c +val preChildren = ptree.get(p) +preChildren match { + case Some(children) => if (!c.toSet.equals(children)) { +val diff: Set[Int] = children -- c.toSet +ptree.update(p, c.toSet ) +diff.foreach(ptree.remove(_)) + } + case None => ptree.update(p, c.toSet ) +} + } + else { +ptree.update(p, Set[Int]()) + } +} + } + + + /** + * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory + * info. I tried that but found it not correct during tests, so I used normal string analysis + * instead. The computation of RSS and Vmem are based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + def getProcessInfo(pid: Int): String = { +try { + val pidDir: File = new File(procfsDir, pid.toString) + val fReader = new InputStreamReader( +new FileInputStream( + new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8")) + val in: BufferedReader = new BufferedReader(fReader) + val procInfo = in.readLine --- End diff -- This is what hadoop's ProcfsBasedProcessTree is doing as well. I wasn't able to find a reference, but
[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17185#discussion_r206382570 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala --- @@ -120,22 +120,54 @@ abstract class LogicalPlan /** * Resolve the given `name` string against the given attribute, returning either 0 or 1 match. --- End diff -- can you briefly explain the new resolution logic? I feel it's a little convoluted now as we are more likely to be ambiguous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17185#discussion_r206381686 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -121,14 +129,14 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn * @param name The name to be associated with the result of computing [[child]]. * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this * alias. Auto-assigned if left blank. - * @param qualifier An optional string that can be used to referred to this attribute in a fully - * qualified way. Consider the examples tableName.name, subQueryAlias.name. - * tableName and subQueryAlias are possible qualifiers. + * @param qualifier An optional Seq of string that can be used to refer to this attribute in a + * fully qualified way. Consider the examples tableName.name, subQueryAlias.name. + * tableName and subQueryAlias are possible qualifiers. * @param explicitMetadata Explicit metadata associated with this alias that overwrites child's. */ case class Alias(child: Expression, name: String)( val exprId: ExprId = NamedExpression.newExprId, -val qualifier: Option[String] = None, +val qualifier: Option[Seq[String]] = None, --- End diff -- Again, I feel using `Seq[String]` directly can simplify a lot of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17185#discussion_r206381459 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala --- @@ -654,16 +654,19 @@ class SessionCatalog( * * If the relation is a view, we generate a [[View]] operator from the view description, and * wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view. + * [[SubqueryAlias]] will also keep track of the name and database(optional) of the table/view * * @param name The name of the table/view that we look up. */ def lookupRelation(name: TableIdentifier): LogicalPlan = { synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) + // To keep track of the name and database of the table/view + val alias = AliasIdentifier(table, Some(db)) if (db == globalTempViewManager.database) { globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(table, viewDef) + SubqueryAlias(alias, viewDef) --- End diff -- I think we can make `SubqueryAlias` take a `qualifier: Seq[String]` instead of `alias: String`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17185: [SPARK-19602][SQL] Support column resolution of f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17185#discussion_r206380665 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -262,17 +262,47 @@ abstract class Star extends LeafExpression with NamedExpression { */ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable { - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + /** + * Returns true if the nameParts match the qualifier of the attribute + * + * There are two checks: i) Check if the nameParts match the qualifier fully. + * E.g. SELECT db.t1.* FROM db1.t1 In this case, the nameParts is Seq("db1", "t1") and --- End diff -- what about `SELECT db1.t1.* FROM t1` while the current database is `db1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21669 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21669 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93807/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21669 **[Test build #93807 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93807/testReport)** for PR 21669 at commit [`0939738`](https://github.com/apache/spark/commit/0939738e2d7b18652055926be3ed7fbba2df3f72). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21923 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93806/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21923 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21923 **[Test build #93806 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93806/testReport)** for PR 21923 at commit [`ba6aa6c`](https://github.com/apache/spark/commit/ba6aa6c829bfcca1b4b3d5a33fe3a7460e7db1f0). * This patch **fails from timeout after a configured wait of \`300m\`**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public class AbstractExecutorPlugin ` * ` .doc(\"Comma-separated list of class names for \"plugins\" implementing \" +` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....
Github user tedyu commented on the issue: https://github.com/apache/spark/pull/21488 @zsxwing Is there anything I should do for this PR ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21881: [SPARK-24930][SQL] Improve exception information ...
Github user ouyangxiaochen commented on a diff in the pull request: https://github.com/apache/spark/pull/21881#discussion_r206377644 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -337,7 +337,11 @@ case class LoadDataCommand( new File(file.getAbsolutePath).exists() } if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + // If user have no permission to access the given input path, `File.exists()` return false + // , `LOAD DATA input path does not exist` can confuse users. + throw new AnalysisException(s"LOAD DATA input path does not exist: `$path` or current " + --- End diff -- OK, Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21881: [SPARK-24930][SQL] Improve exception information when us...
Github user ouyangxiaochen commented on the issue: https://github.com/apache/spark/pull/21881 @gatorsmile Hi, i am not sure how to build this scene in test case, just assert if the exception info contains the key message `have no permission to access the input path`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21887: [SPARK-23633][SQL] Update Pandas UDFs section in ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21887 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21834: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC part...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21834 Thanks for the merge! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21887: [SPARK-23633][SQL] Update Pandas UDFs section in sql-pro...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21887 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17185 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17185 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93810/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21926 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17185 **[Test build #93810 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93810/testReport)** for PR 17185 at commit [`8206dc3`](https://github.com/apache/spark/commit/8206dc3bc2595507ba71e7f50fddeed0c3b16479). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21926 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1510/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21916#discussion_r206376041 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +181,34 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed + + /** + * Off heap execution memory currently in use, in bytes. + */ + final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed + + /** + * On heap storage memory currently in use, in bytes. + */ + final def onHeapStorageMemoryUsed: Long = onHeapStorageMemoryPool.memoryUsed + + /** + * Off heap storage memory currently in use, in bytes. + */ + final def offHeapStorageMemoryUsed: Long = offHeapStorageMemoryPool.memoryUsed + + /** + * If the system isn't procfsBased the process tree metrics' values will be -1, + * meaning not available + */ + final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems + if (pTreeInfo.isAvailable) { --- End diff -- I will change this as well. It is a final val since other metrics were also final. Will check the lazy val, but probably not much difference since this initiation will be executed just one time anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21926 **[Test build #93818 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93818/testReport)** for PR 21926 at commit [`b41a45c`](https://github.com/apache/spark/commit/b41a45cb22bd3d49e75711950bcbc3d409bc544a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21926 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21916#discussion_r206375088 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.io.FileNotFoundException +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +class ProcfsBasedSystems extends ProcessTreeMetrics { + val procfsDir = "/proc/" + var isAvailable: Boolean = isItProcfsBased + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = +scala.collection.mutable.Map[ Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + + + def isItProcfsBased: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +true + } + + + def computePid(): Int = { +if (!isAvailable) { + return -1; +} +val cmd = Array("bash", "-c", "echo $PPID") +val length = 10 +var out: Array[Byte] = Array.fill[Byte](length)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +val pid = Integer.parseInt(new String(out, "UTF-8").trim) +return pid; + } + + + def createProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue: Queue[Int] = new Queue[Int]() +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + + def updateProcessTree(): Unit = { --- End diff -- Thanks @ankuriitg for the review. I will apply your comments ASAP. For this one I may do some other improvements before just creating the process tree each time. I understand in this version updating looks more complex than just recreating it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21916: [SPARK-24958][WIP] Report executors' process tree...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21916#discussion_r206375077 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.io.FileNotFoundException +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +class ProcfsBasedSystems extends ProcessTreeMetrics { + val procfsDir = "/proc/" + var isAvailable: Boolean = isItProcfsBased + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = +scala.collection.mutable.Map[ Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + + + def isItProcfsBased: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +true + } + + + def computePid(): Int = { +if (!isAvailable) { + return -1; +} +val cmd = Array("bash", "-c", "echo $PPID") +val length = 10 +var out: Array[Byte] = Array.fill[Byte](length)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +val pid = Integer.parseInt(new String(out, "UTF-8").trim) +return pid; + } + + + def createProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue: Queue[Int] = new Queue[Int]() +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + + def updateProcessTree(): Unit = { +if (!isAvailable) { + return +} +val queue: Queue[Int] = new Queue[Int]() +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { +queue ++= c +val preChildren = ptree.get(p) +preChildren match { + case Some(children) => if (!c.toSet.equals(children)) { +val diff: Set[Int] = children -- c.toSet +ptree.update(p, c.toSet ) +diff.foreach(ptree.remove(_)) + } + case None => ptree.update(p, c.toSet ) +} + } + else { +ptree.update(p, Set[Int]()) + } +} + } + + + /** + * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory + * info. I tried that but found it not correct during tests, so I used normal string analysis + * instead. The computation of RSS and Vmem are based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + def getProcessInfo(pid: Int): String = { +try { + val pidDir: File = new File(procfsDir, pid.toString) + val fReader = new InputStreamReader( +new FileInputStream( + new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8")) + val in: BufferedReader = new BufferedReader(fReader) + val procInfo = in.readLine + in.close + fReader.close + return procInfo +} catch { + case f: FileNotFoundException
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206374910 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Wait .. this is an exposed API, right? I guess this is exposed to Java API too (for instance `query.lastProgress().sources()`)? In that case, we should avoid Scala's Option and `org.json4s.*`. If this is supposed to be hidden here, I think we should better find a way to hide this with package level access modifier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206374120 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for reporting custom metrics from streaming sources and sinks + */ +@InterfaceStability.Evolving +public interface CustomMetrics { --- End diff -- Java side should also be 2 spaced indented (see "Code Style Guide" in https://spark.apache.org/contributing.html) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206373655 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +151,46 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => Option[CustomMetrics], + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { --- End diff -- nit: ```scala def extractMetrics( getMetrics: () => Option[CustomMetrics], onInvalidMetrics: (Exception) => Unit): Option[JValue] = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206373303 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Default value does not work in Java API and probably MiMa complains about this. I think another constructor should better be made instead of default value to work around this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20028: [SPARK-19053][ML]Supporting multiple evaluation metrics ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20028 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1509/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20028: [SPARK-19053][ML]Supporting multiple evaluation metrics ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20028 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20010 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org