[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20319 @WeichenXu123 thanks for checking it out. I've resolved the conflicts, build is green. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20235#discussion_r180124798 --- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala --- @@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } test("FPGrowth fit and transform with different data types") { -Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt => - val data = dataset.withColumn("items", col("items").cast(ArrayType(dt))) - val model = new FPGrowth().setMinSupport(0.5).fit(data) - val generatedRules = model.setMinConfidence(0.5).associationRules - val expectedRules = spark.createDataFrame(Seq( -(Array("2"), Array("1"), 1.0), -(Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") -.withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) -.withColumn("consequent", col("consequent").cast(ArrayType(dt))) - assert(expectedRules.sort("antecedent").rdd.collect().sameElements( -generatedRules.sort("antecedent").rdd.collect())) - - val transformed = model.transform(data) - val expectedTransformed = spark.createDataFrame(Seq( -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "3"), Array(2)) - )).toDF("id", "items", "prediction") -.withColumn("items", col("items").cast(ArrayType(dt))) -.withColumn("prediction", col("prediction").cast(ArrayType(dt))) - assert(expectedTransformed.collect().toSet.equals( -transformed.collect().toSet)) + class DataTypeWithEncoder[A](val a: DataType) + (implicit val encoder: Encoder[(Int, Array[A], Array[A])]) + + Array( +new DataTypeWithEncoder[Int](IntegerType), +new DataTypeWithEncoder[String](StringType), +new DataTypeWithEncoder[Short](ShortType), +new DataTypeWithEncoder[Long](LongType) +// , new DataTypeWithEncoder[Byte](ByteType) +// TODO: using ByteType produces error, as Array[Byte] is handled as Binary +// cannot resolve 'CAST(`items` AS BINARY)' due to data type mismatch: +// cannot cast array to binary; + ).foreach { dt => { +val data = dataset.withColumn("items", col("items").cast(ArrayType(dt.a))) +val model = new FPGrowth().setMinSupport(0.5).fit(data) +val generatedRules = model.setMinConfidence(0.5).associationRules +val expectedRules = Seq( + (Array("2"), Array("1"), 1.0), + (Array("1"), Array("2"), 0.75) +).toDF("antecedent", "consequent", "confidence") + .withColumn("antecedent", col("antecedent").cast(ArrayType(dt.a))) + .withColumn("consequent", col("consequent").cast(ArrayType(dt.a))) +assert(expectedRules.sort("antecedent").rdd.collect().sameElements( + generatedRules.sort("antecedent").rdd.collect())) + +val expectedTransformed = Seq( + (0, Array("1", "2"), Array.emptyIntArray), --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20235 @WeichenXu123 Executing `test("FPGrowth fit and transform with different data types")` with Byte data type produces an `org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`items` AS BINARY)' due to data type mismatch: cannot cast array to binary` I don't remember all the details, but somewhere (catalyst?) the Array[Byte] has special treatment as Binary. When `testTransformerOnStreamData` tries to collect the data from the dataframe to test it as a stream this special casing causes the above mentioned exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 ping @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 ping @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r172122977 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala --- @@ -75,11 +71,9 @@ class MultilayerPerceptronClassifierSuite .setMaxIter(100) .setSolver("l-bfgs") val model = trainer.fit(dataset) -val result = model.transform(dataset) MLTestingUtils.checkCopyAndUids(trainer, model) -val predictionAndLabels = result.select("prediction", "label").collect() -predictionAndLabels.foreach { case Row(p: Double, l: Double) => - assert(p == l) +testTransformer[(Vector, Double)](dataset.toDF(), model, "prediction", "label") { --- End diff -- ` @transient var dataset: Dataset[_] = _` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 I think I've addressed all your points @jiangxb1987. Do you think I need any more changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166883609 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); + +public static void write004(Writer writer, --- End diff -- No doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166883372 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway";> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +pub
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166886232 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); + +public static void write004(Writer writer, +Enumeration mfs)throws IOException { +write004(writer, mfs, null); +} + +/** + * Write out the text version 0.0.4 of the given MetricFamilySamples. + */ +public static void write004(Writer writer,Enumeration mfs, +String timestamp) throws IOException { +/* See http://prometheus.io/docs/instrumenting/exposition_formats/ + * for the output format specification. */ +while(mfs.hasMoreElements()) { +Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); --- End diff -- I think `for(Collector.MetricFamilySamples s: Collections.list(mfs)) {` would be nicer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166889139 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); --- End diff -- Usage of this variable is questionable for a couple of reasons: - it just keeps growing, it's never cleared or re-initialized. As a consequence from the second call of write it will have invalid content + it acts as a memory leak. - its usage pattern (`writer.write(blah);appendToJsonMessageLogBuilder("blah")`) is pretty verbose, it should be factored out. - it's not thread safe (and it's not documented) - I don't think accessing it as a static member everywhere is a good design. It should either - be passed around as method parameter - or changed to an instance method. The static write004 could instantiate a new `TextFormatWithTimestamp` and call write on that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166896270 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway";> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +pub
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166892802 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.URI +import java.util +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.codahale.metrics._ +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.dropwizard.DropwizardExports + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.METRICS_NAMESPACE +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp + + +private[spark] class PrometheusSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) + extends Sink with Logging { + + protected class Reporter(registry: MetricRegistry) +extends ScheduledReporter( + registry, + "prometheus-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS) { + +val defaultSparkConf: SparkConf = new SparkConf(true) + +override def report( + gauges: util.SortedMap[String, Gauge[_]], + counters: util.SortedMap[String, Counter], + histograms: util.SortedMap[String, Histogram], + meters: util.SortedMap[String, Meter], + timers: util.SortedMap[String, Timer]): Unit = { + + // SparkEnv may become available only after metrics sink creation thus retrieving + // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink. + val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf) + + val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE) + val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id") + val executorId: Option[String] = sparkConf.getOption("spark.executor.id") + + logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " + +s"executorId=$executorId") + + val role: String = (sparkAppId, executorId) match { +case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver" +case (Some(_), Some(_)) => "executor" +case _ => "shuffle" + } + + val job: String = role match { +case "driver" => metricsNamespace.getOrElse(sparkAppId.get) +case "executor" => metricsNamespace.getOrElse(sparkAppId.get) +case _ => metricsNamespace.getOrElse("shuffle") + } + logInfo(s"role=$role, job=$job") + + val groupingKey: Map[String, String] = (role, executorId) match { +case ("driver", _) => Map("role" -> role) +case ("executor", Some(id)) => Map ("role" -> role, "number" -> id) +case _ => Map("role" -> role) + } + + --- End diff -- Nit: extra line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166881873 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway";> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +pub
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166883368 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { --- End diff -- No doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166886527 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); + +public static void write004(Writer writer, +Enumeration mfs)throws IOException { +write004(writer, mfs, null); +} + +/** + * Write out the text version 0.0.4 of the given MetricFamilySamples. + */ +public static void write004(Writer writer,Enumeration mfs, +String timestamp) throws IOException { +/* See http://prometheus.io/docs/instrumenting/exposition_formats/ + * for the output format specification. */ +while(mfs.hasMoreElements()) { +Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); --- End diff -- Also, method body is not indented well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166896081 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway";> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +pub
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166886839 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); + +public static void write004(Writer writer, +Enumeration mfs)throws IOException { +write004(writer, mfs, null); +} + +/** + * Write out the text version 0.0.4 of the given MetricFamilySamples. + */ +public static void write004(Writer writer,Enumeration mfs, +String timestamp) throws IOException { +/* See http://prometheus.io/docs/instrumenting/exposition_formats/ + * for the output format specification. */ +while(mfs.hasMoreElements()) { +Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); + +logger.debug("Metrics data"); +logger.debug(metricFamilySamples.toString()); +logger.debug("Logging metrics as a json format:"); + + +writer.write("# HELP "); +appendToJsonMessageLogBuilder("# HELP "); +writer.write(metricFamilySamples.name); +appendToJsonMessageLogBuilder(metricFamilySamples.name); +writer.write(' '); +appendToJsonMessageLogBuilder(' '); +writeEscapedHelp(writer, metricFamilySamples.help); +writer.write('\n'); +appendToJsonMessageLogBuilder('\n'); + +writer.write("# TYPE "); +appendToJsonMessageLogBuilder("# TYPE "); +writer.write(metricFamilySamples.name); +appendToJsonMessageLogBuilder(metricFamilySamples.name); +writer.write(' '); +appendToJsonMessageLogBuilder(' '); +writer.write(typeString(metricFamilySamples.type)); + appendToJsonMessageLogBuilder(typeString(metricFamilySamples.type)); +writer.write('\n'); +appendToJsonMessageLogBuilder('\n'); + +for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) { +writer.write(sample.name); +appendToJsonMessageLogBuilder(sample.name); +if (sample.labelNames.size() > 0) { +writer.write('{'); +appendToJsonMessageLogBuilder('{'); +for (int i = 0; i < sample.labelNames.size(); ++i) { +writer.write(sample.labelNames.get(i)); + appendToJsonMessageLogBuilder(sample.labelNames.get(i)); +writer.write("=\""); +appendToJsonMessageLogBuilder("=\""); +writeEscapedLabelValue(writer, sample.labelValues.get(i)); +writer.write("\","); +appendToJsonMessageLogBuilder("\","); +} +writer.write('}'); +appendToJsonMessageLogBuilder('}'); +} +writer.write(' '); +appendToJsonMessageLogBuilder(' '); +writer.write(Collector.doubleToGoString(sample.value)); + appendToJsonMes
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166894472 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.URI +import java.util +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.codahale.metrics._ +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.dropwizard.DropwizardExports + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.METRICS_NAMESPACE +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp + + +private[spark] class PrometheusSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) + extends Sink with Logging { + + protected class Reporter(registry: MetricRegistry) +extends ScheduledReporter( + registry, + "prometheus-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS) { + +val defaultSparkConf: SparkConf = new SparkConf(true) + +override def report( + gauges: util.SortedMap[String, Gauge[_]], + counters: util.SortedMap[String, Counter], + histograms: util.SortedMap[String, Histogram], + meters: util.SortedMap[String, Meter], + timers: util.SortedMap[String, Timer]): Unit = { + + // SparkEnv may become available only after metrics sink creation thus retrieving + // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink. + val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf) + + val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE) + val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id") + val executorId: Option[String] = sparkConf.getOption("spark.executor.id") + + logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " + +s"executorId=$executorId") + + val role: String = (sparkAppId, executorId) match { +case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver" +case (Some(_), Some(_)) => "executor" +case _ => "shuffle" + } + + val job: String = role match { +case "driver" => metricsNamespace.getOrElse(sparkAppId.get) +case "executor" => metricsNamespace.getOrElse(sparkAppId.get) +case _ => metricsNamespace.getOrElse("shuffle") + } + logInfo(s"role=$role, job=$job") + + val groupingKey: Map[String, String] = (role, executorId) match { +case ("driver", _) => Map("role" -> role) +case ("executor", Some(id)) => Map ("role" -> role, "number" -> id) +case _ => Map("role" -> role) + } + + + pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava, +s"${System.currentTimeMillis}") + +} + + } + + val DEFAULT_PUSH_PERIOD: Int = 10 + val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS + val DEFAULT_PUSHGATEWAY_ADDRE
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r166894977 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.URI +import java.util +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.codahale.metrics._ +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.dropwizard.DropwizardExports + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.METRICS_NAMESPACE +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp + + +private[spark] class PrometheusSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) --- End diff -- `securityMgr` is never used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165745445 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -599,8 +598,11 @@ class ALSSuite (ex, act) => ex.userFactors.first().getSeq[Float](1) === act.userFactors.first.getSeq[Float](1) } { (ex, act, _) => - ex.transform(_: DataFrame).select("prediction").first.getDouble(0) ~== -act.transform(_: DataFrame).select("prediction").first.getDouble(0) absTol 1e-6 + testTransformerByGlobalCheckFunc[Float](_: DataFrame, act, "prediction") { +case actRows: Seq[Row] => + ex.transform(_: DataFrame).select("prediction").first.getDouble(0) ~== +actRows(0).getDouble(0) absTol 1e-6 + } --- End diff -- I think this code does not check anything. `testTransformerByGlobalCheckFunc[Float](_: DataFrame, act, "prediction")` is just a partial application of `testTransformerByGlobalCheckFunc`. However, `checkNumericTypesALS` expects `check2: (ALSModel, ALSModel, DataFrame) => Unit`. It's happy to call the provided function, discard the partially applied function and use `()` instead, so it will typecheck. As a consequence, the function doing the assert is never called, so the `~===` assertion never happens. You can check it say by asking for the 100th column of the first row - it will not produce an error. This problem is not a result of your change, the original code had the same issue. It could probably be simplified a bit but I think the original intent was to do a check like this: ``` { (ex, act, df) => ex.transform(df).selectExpr("cast(prediction as double)").first.getDouble(0) ~== act.transform(df).selectExpr("cast(prediction as double)").first.getDouble(0) absTol 1e-6 } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165304423 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -566,6 +565,7 @@ class ALSSuite test("read/write") { val spark = this.spark import spark.implicits._ + --- End diff -- nit: new line is not needed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165308129 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -628,18 +635,24 @@ class ALSSuite } withClue("transform should fail when ids exceed integer range. ") { val model = als.fit(df) - assert(intercept[SparkException] { -model.transform(df.select(df("user_big").as("user"), df("item"))).first - }.getMessage.contains(msg)) - assert(intercept[SparkException] { -model.transform(df.select(df("user_small").as("user"), df("item"))).first - }.getMessage.contains(msg)) - assert(intercept[SparkException] { -model.transform(df.select(df("item_big").as("item"), df("user"))).first - }.getMessage.contains(msg)) - assert(intercept[SparkException] { -model.transform(df.select(df("item_small").as("item"), df("user"))).first - }.getMessage.contains(msg)) + def testTransformIdExceedsIntRange[A : Encoder](dataFrame: DataFrame): Unit = { +assert(intercept[SparkException] { + model.transform(dataFrame).first +}.getMessage.contains(msg)) +assert(intercept[StreamingQueryException] { + testTransformer[A](dataFrame, model, "prediction") { +case _ => --- End diff -- No need for a partial function here, you can simplify it to `{ _ => }`. I would also add a small comment to make it explicit that we intentionally do not check anything. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165305989 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -599,8 +599,15 @@ class ALSSuite (ex, act) => ex.userFactors.first().getSeq[Float](1) === act.userFactors.first.getSeq[Float](1) } { (ex, act, _) => - ex.transform(_: DataFrame).select("prediction").first.getDouble(0) ~== -act.transform(_: DataFrame).select("prediction").first.getDouble(0) absTol 1e-6 + testTransformerByGlobalCheckFunc[Float](_: DataFrame, ex, "prediction") { +case exRows: Seq[Row] => --- End diff -- I think it's ok to keep ex.transform here. This way the code will be a bit simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165312057 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -662,28 +676,32 @@ class ALSSuite val knownItem = data.select(max("item")).as[Int].first() val unknownItem = knownItem + 20 val test = Seq( - (unknownUser, unknownItem), - (knownUser, unknownItem), - (unknownUser, knownItem), - (knownUser, knownItem) -).toDF("user", "item") + (unknownUser, unknownItem, true), + (knownUser, unknownItem, true), + (unknownUser, knownItem, true), + (knownUser, knownItem, false) +).toDF("user", "item", "expectedIsNaN") val als = new ALS().setMaxIter(1).setRank(1) // default is 'nan' val defaultModel = als.fit(data) -val defaultPredictions = defaultModel.transform(test).select("prediction").as[Float].collect() -assert(defaultPredictions.length == 4) -assert(defaultPredictions.slice(0, 3).forall(_.isNaN)) -assert(!defaultPredictions.last.isNaN) +var defaultPredictionNotNaN = Float.NaN --- End diff -- I would get rid of this variable. In `testTransformer` it just adds overhead, `assert(!defaultPredictionNotNaN.isNaN)` asserts something that was already checked in testTransformer, so it's only use is in `testTransformerByGlobalCheckFunc`. Producing it is a bit convoluted, it's not easy to understand why it's needed. I would make it clearer by doing a plain old transform using the `test` DF (or a smaller one containing only the knownUser, knownItem pair) and selecting the value. An alternative solution could be to use real expected values in the `test` DF instead of "isNan" flags. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165312157 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -693,7 +711,9 @@ class ALSSuite val data = ratings.toDF val model = new ALS().fit(data) Seq("nan", "NaN", "Nan", "drop", "DROP", "Drop").foreach { s => - model.setColdStartStrategy(s).transform(data) + testTransformer[Rating[Int]](data, model.setColdStartStrategy(s), "prediction") { +case _ => --- End diff -- Just like above, no need for partial function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20362#discussion_r165308205 --- Diff: mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala --- @@ -653,6 +666,7 @@ class ALSSuite test("ALS cold start user/item prediction strategy") { val spark = this.spark import spark.implicits._ + --- End diff -- nit: no need for empty line here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 Do you think I need to cover any other cases, @jiangxb1987 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20235#discussion_r164427189 --- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala --- @@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } test("FPGrowth fit and transform with different data types") { -Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt => - val data = dataset.withColumn("items", col("items").cast(ArrayType(dt))) - val model = new FPGrowth().setMinSupport(0.5).fit(data) - val generatedRules = model.setMinConfidence(0.5).associationRules - val expectedRules = spark.createDataFrame(Seq( -(Array("2"), Array("1"), 1.0), -(Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") -.withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) -.withColumn("consequent", col("consequent").cast(ArrayType(dt))) - assert(expectedRules.sort("antecedent").rdd.collect().sameElements( -generatedRules.sort("antecedent").rdd.collect())) - - val transformed = model.transform(data) - val expectedTransformed = spark.createDataFrame(Seq( -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "3"), Array(2)) - )).toDF("id", "items", "prediction") -.withColumn("items", col("items").cast(ArrayType(dt))) -.withColumn("prediction", col("prediction").cast(ArrayType(dt))) - assert(expectedTransformed.collect().toSet.equals( -transformed.collect().toSet)) + class DataTypeWithEncoder[A](val a: DataType) + (implicit val encoder: Encoder[(Int, Array[A], Array[A])]) + + Array( +new DataTypeWithEncoder[Int](IntegerType), +new DataTypeWithEncoder[String](StringType), +new DataTypeWithEncoder[Short](ShortType), +new DataTypeWithEncoder[Long](LongType) +// , new DataTypeWithEncoder[Byte](ByteType) +// TODO: using ByteType produces error, as Array[Byte] is handled as Binary +// cannot resolve 'CAST(`items` AS BINARY)' due to data type mismatch: +// cannot cast array to binary; + ).foreach { dt => { +val data = dataset.withColumn("items", col("items").cast(ArrayType(dt.a))) +val model = new FPGrowth().setMinSupport(0.5).fit(data) +val generatedRules = model.setMinConfidence(0.5).associationRules +val expectedRules = Seq( + (Array("2"), Array("1"), 1.0), + (Array("1"), Array("2"), 0.75) +).toDF("antecedent", "consequent", "confidence") + .withColumn("antecedent", col("antecedent").cast(ArrayType(dt.a))) + .withColumn("consequent", col("consequent").cast(ArrayType(dt.a))) +assert(expectedRules.sort("antecedent").rdd.collect().sameElements( + generatedRules.sort("antecedent").rdd.collect())) + +val expectedTransformed = Seq( + (0, Array("1", "2"), Array.emptyIntArray), + (0, Array("1", "2"), Array.emptyIntArray), + (0, Array("1", "2"), Array.emptyIntArray), + (0, Array("1", "3"), Array(2)) +).toDF("id", "items", "expected") + .withColumn("items", col("items").cast(ArrayType(dt.a))) + .withColumn("expected", col("expected").cast(ArrayType(dt.a))) + +testTransformer(expectedTransformed, model, + "expected", "prediction") { + case Row(expected, prediction) => assert(expected === prediction, +s"Expected $expected but found $prediction for data type $dt") +}(dt.encoder) + } } } test("FPGrowth getFreqItems") { val model = new FPGrowth().setMinSupport(0.7).fit(dataset) -val expectedFreq = spark.createDataFrame(Seq( +val expectedFreq = Seq( (Ar
[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20330 Thanks for your help @vanzin, @gengliangwang, @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20046: [SPARK-22362][SQL] Add unit test for Window Aggregate Fu...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20046 @jiangxb1987 how does your request to cover the sql interface relates to SPARK-23160? I assume it is to be covered in that issue, is that correct? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20330#discussion_r162792383 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -1002,4 +1000,12 @@ private object ApiHelper { } } + def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = { +store.asOption(store.lastStageAttempt(job.stageIds.max)) match { + case Some(lastStageAttempt) => +(lastStageAttempt.name, lastStageAttempt.description.getOrElse(job.name)) + case None => ("", "") --- End diff -- Fixed, thanks for catching. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20330#discussion_r162759866 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -427,23 +435,21 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max) -val lastStageDescription = lastStageAttempt.description.getOrElse("") +val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData) -val formattedJobDescription = - UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) +val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) --- End diff -- I've moved this logic to `lastStageNameAndDescription`, so it's uniform. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20319 @jkbradley could you check out this change, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20235 @jkbradley could you check out this change, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20330#discussion_r162712767 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -427,23 +435,21 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max) -val lastStageDescription = lastStageAttempt.description.getOrElse("") +val (lastStageName, lastStageDescription) = lastStageNameAndDescription(store, jobData) -val formattedJobDescription = - UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) +val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false) --- End diff -- `lastStageDescription` may be empty, but it will not cause problems, `makeDescription` will handle it properly, just like in the version before lastStageAttempt was used: ``` val jobDescription = UIUtils.makeDescription(jobData.description.getOrElse(""), basePath, plainText = false) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20330#discussion_r162622469 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -65,10 +68,13 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We }.map { job => val jobId = job.jobId val status = job.status - val jobDescription = store.lastStageAttempt(job.stageIds.max).description - val displayJobDescription = jobDescription -.map(UIUtils.makeDescription(_, "", plainText = true).text) -.getOrElse("") + val (_, lastStageDescription) = lastStageNameAndDescription(store, job) + val displayJobDescription = +if (lastStageDescription.isEmpty) { + job.name --- End diff -- Using job.name instead of "" to behave more like the pre-2.3 version: https://github.com/smurakozi/spark/blob/772e4648d95bda3353723337723543c741ea8476/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala#L70 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20330 cc @jiangxb1987, @srowen, @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20330 @guoxiaolongzte could you please check if this change fixes the issue you have observed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/20330 [SPARK-23121][core] Fix for ui becoming unaccessible for long running streaming apps ## What changes were proposed in this pull request? The allJobs and the job pages attempt to use stage attempt and DAG visualization from the store, but for long running jobs they are not guaranteed to be retained, leading to exceptions when these pages are rendered. To fix it `store.lastStageAttempt(stageId)` and `store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default values are used if the info is missing. ## How was this patch tested? Manual testing of the UI, also using the test command reported in SPARK-23121: ./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount ./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-23121 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20330 commit 94d50b42d6bf233afd398049c95386920c21c252 Author: Sandor Murakozi Date: 2018-01-19T10:59:36Z Fixed issue caused by the store cleaning up old stages commit d60ae4f39337b91118324064c6a3dc58a3fc2832 Author: Sandor Murakozi Date: 2018-01-19T11:33:27Z JobPage doesn't break if operationGraphForJob is not in the store for a jobid commit 832378d25245126c285e794fadcaea019b70a78a Author: Sandor Murakozi Date: 2018-01-19T11:34:59Z lastStageNameAndDescription uses store.lastStageAttempt commit 6525ef4eda0bf65bbbcb842495341afc8c5971ad Author: Sandor Murakozi Date: 2018-01-19T12:15:33Z Changed message in case of missing DAG visualization info --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20287: [SPARK-23121][WEB-UI] When the Spark Streaming ap...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20287#discussion_r162566289 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -427,17 +430,24 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max) -val lastStageDescription = lastStageAttempt.description.getOrElse("") - +var lastStageDescription = "" --- End diff -- Instead of catching the exception the logic should be modified to be prepared for a missing stageAttempt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20287: [SPARK-23121][WEB-UI] When the Spark Streaming ap...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20287#discussion_r162566562 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -335,9 +335,12 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, store.executorList(false), appStartTime) - -content ++= UIUtils.showDagVizForJob( - jobId, store.operationGraphForJob(jobId)) +try { + content ++= UIUtils.showDagVizForJob( +jobId, store.operationGraphForJob(jobId)) +} catch { + case e => None +} --- End diff -- Same here. We should avoid the situation when the exception is thrown. Catching the exception and doing nothing just hides the problems. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20319: [SPARK-22884][ML][TESTS] ML test for StructuredSt...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20319#discussion_r162460705 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/Encoders.scala --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +private[clustering] object Encoders { + implicit val vectorEncoder = ExpressionEncoder[Vector]() --- End diff -- Is there a better solution to provide an implicit Vector encoder for testTransformer? Is it ok here, or is there a better place for it? e.g. `org.apache.spark.mllib.util.MLlibTestSparkContext.testImplicits` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20319: [SPARK-22884][ML][TESTS] ML test for StructuredSt...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/20319 [SPARK-22884][ML][TESTS] ML test for StructuredStreaming: spark.ml.clustering ## What changes were proposed in this pull request? Converting clustering tests to also check code with structured streaming, using the ML testing infrastructure implemented in SPARK-22882. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-22884 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20319.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20319 commit 97c96b6ac7f65762cf9f125965e8d2a3cba72f60 Author: Sandor Murakozi Date: 2018-01-18T19:03:33Z Converted all clustering tests to check streaming commit b6e06e8e280f97560a342e287072f0b49e85bb79 Author: Sandor Murakozi Date: 2018-01-18T20:12:46Z formatting, nits --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 @gatorsmile @hvanhovell @jiangxb1987, could you have a look, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r162052224 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala --- @@ -29,15 +29,14 @@ import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix, Vector, Vectors} import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.param.{ParamMap, ParamsSuite} -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext --- End diff -- nit: unused import, could be removed, just like SparkFunSuite a couple lines above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r162053757 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala --- @@ -75,11 +71,9 @@ class MultilayerPerceptronClassifierSuite .setMaxIter(100) .setSolver("l-bfgs") val model = trainer.fit(dataset) -val result = model.transform(dataset) MLTestingUtils.checkCopyAndUids(trainer, model) -val predictionAndLabels = result.select("prediction", "label").collect() -predictionAndLabels.foreach { case Row(p: Double, l: Double) => - assert(p == l) +testTransformer[(Vector, Double)](dataset.toDF(), model, "prediction", "label") { --- End diff -- `dataSet` is always a `dataFrame` in this suite. If it was declared as such there would be no need to always call `toDF()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r162037613 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala --- @@ -169,59 +171,28 @@ class GBTClassifierSuite extends SparkFunSuite with MLlibTestSparkContext val blas = BLAS.getInstance() val validationDataset = validationData.toDF(labelCol, featuresCol) -val results = gbtModel.transform(validationDataset) -// check that raw prediction is tree predictions dot tree weights -results.select(rawPredictionCol, featuresCol).collect().foreach { - case Row(raw: Vector, features: Vector) => +testTransformer[(Double, Vector)](validationDataset, gbtModel, + "rawPrediction", "features", "probability", "prediction") { + case Row(raw: Vector, features: Vector, prob: Vector, pred: Double) => assert(raw.size === 2) +// check that raw prediction is tree predictions dot tree weights val treePredictions = gbtModel.trees.map(_.rootNode.predictImpl(features).prediction) val prediction = blas.ddot(gbtModel.numTrees, treePredictions, 1, gbtModel.treeWeights, 1) assert(raw ~== Vectors.dense(-prediction, prediction) relTol eps) -} -// Compare rawPrediction with probability -results.select(rawPredictionCol, probabilityCol).collect().foreach { - case Row(raw: Vector, prob: Vector) => -assert(raw.size === 2) +// Compare rawPrediction with probability assert(prob.size === 2) // Note: we should check other loss types for classification if they are added val predFromRaw = raw.toDense.values.map(value => LogLoss.computeProbability(value)) assert(prob(0) ~== predFromRaw(0) relTol eps) assert(prob(1) ~== predFromRaw(1) relTol eps) assert(prob(0) + prob(1) ~== 1.0 absTol absEps) -} -// Compare prediction with probability -results.select(predictionCol, probabilityCol).collect().foreach { - case Row(pred: Double, prob: Vector) => +// Compare prediction with probability val predFromProb = prob.toArray.zipWithIndex.maxBy(_._1)._2 assert(pred == predFromProb) } -// force it to use raw2prediction -gbtModel.setRawPredictionCol(rawPredictionCol).setProbabilityCol("") -val resultsUsingRaw2Predict = - gbtModel.transform(validationDataset).select(predictionCol).as[Double].collect() - resultsUsingRaw2Predict.zip(results.select(predictionCol).as[Double].collect()).foreach { - case (pred1, pred2) => assert(pred1 === pred2) -} - -// force it to use probability2prediction -gbtModel.setRawPredictionCol("").setProbabilityCol(probabilityCol) -val resultsUsingProb2Predict = - gbtModel.transform(validationDataset).select(predictionCol).as[Double].collect() - resultsUsingProb2Predict.zip(results.select(predictionCol).as[Double].collect()).foreach { - case (pred1, pred2) => assert(pred1 === pred2) -} - -// force it to use predict --- End diff -- Why were these transformations and checks removed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r162058792 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala --- @@ -17,14 +17,13 @@ package org.apache.spark.ml.classification -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.classification.LogisticRegressionSuite._ import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.feature.StringIndexer -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.{ParamMap, ParamsSuite} -import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, MLTest, MLTestingUtils} --- End diff -- nit: import org.apache.spark.mllib.util.MLlibTestSparkContext is unused at line 32 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r162051926 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala --- @@ -17,22 +17,18 @@ package org.apache.spark.ml.classification -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LogisticRegressionSuite._ import org.apache.spark.ml.linalg.{Vector, Vectors} -import org.apache.spark.ml.util.DefaultReadWriteTest -import org.apache.spark.ml.util.MLTestingUtils +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS import org.apache.spark.mllib.evaluation.MulticlassMetrics import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions._ --- End diff -- nit: unused import, could be removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20286: [SPARK-23119][SS] Minor fixes to V2 streaming APIs
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20286 I think this change is OK, except the nits zsxwing already noted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20243#discussion_r161763793 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String]) +extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20) + + // Truncate the displayed data if it is too long, by default it is true + private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true) --- End diff -- Same simplification possibility here, if DataSoureV2Options is used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20243#discussion_r161760765 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources --- End diff -- I think this one can be deleted too: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala#L453 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20243#discussion_r161763865 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.ByteArrayOutputStream + +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming.StreamTest + +class ConsoleWriterSuite extends StreamTest { + import testImplicits._ + + test("console") { +val input = MemoryStream[Int] + +val captured = new ByteArrayOutputStream() +Console.withOut(captured) { + val query = input.toDF().writeStream.format("console").start() + try { +input.addData(1, 2, 3) +query.processAllAvailable() +input.addData(4, 5, 6) +query.processAllAvailable() +input.addData() +query.processAllAvailable() + } finally { +query.stop() + } +} + +assert(captured.toString() == + """--- +|Batch: 0 +|--- +|+-+ +||value| +|+-+ +||1| +||2| +||3| +|+-+ +| +|--- +|Batch: 1 +|--- +|+-+ +||value| +|+-+ +||4| +||5| +||6| +|+-+ +| +|--- +|Batch: 2 +|--- +|+-+ +||value| +|+-+ +|+-+ +| +|""".stripMargin) + } +} --- End diff -- We could have a test to check numrows, something like this: ``` test("console with numRows") { val input = MemoryStream[Int] val captured = new ByteArrayOutputStream() Console.withOut(captured) { val query = input.toDF().writeStream.format("console").option("NUMROWS", 2).start() try { input.addData(1, 2, 3) query.processAllAvailable() } finally { query.stop() } } assert(captured.toString() == """--- |Batch: 0 |--- |+-+ ||value| |+-+ ||1| ||2| |+-+ |only showing top 2 rows | |""".stripMargin) } test("console with truncation") { val input = MemoryStream[String] val captured = new ByteArrayOutputStream() Console.withOut(captured) { val query = input.toDF().writeStream.format("console").option("TRUNCATE", true).start() try { input.addData("123456789012345678901234567890") query.processAllAvailable() } finally { query.stop() } } assert(captured.toString() == """--- |Batch: 0 |--- |++ || value| |++ ||12345678901234567...| |++ | |""".stripMargin) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20243#discussion_r161749201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String]) +extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20) --- End diff -- ConsoleRelation creates this map from a DataSoureV2Options, it contains lowercased keys. Using DataSoureV2Options or asking for "numrows" would both work, but with DataSoureV2Options `options.get("numRows").map(_.toInt).getOrElse(20)` could also be simplified to `options.getInt("numRows", 20)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20251#discussion_r161293689 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -429,20 +429,40 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val jobDescription = UIUtils.makeDescription(jobData.description.getOrElse(""), - basePath, plainText = false) + +val lastStageAttempt = { + val stageAttempts = jobData.stageIds.flatMap(store.stageData(_)) --- End diff -- Do we want to have data of the last stage or the last stageAttempt? In the second case we should load only the last stage, indeed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20251#discussion_r161276636 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -429,20 +429,40 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val jobDescription = UIUtils.makeDescription(jobData.description.getOrElse(""), - basePath, plainText = false) + +val lastStage = { + val stageAttempts = jobData.stageIds.flatMap(store.stageData(_)) + if (!stageAttempts.isEmpty) { +val lastId = stageAttempts.map(_.stageId).max +stageAttempts.find(_.stageId == lastId) + } else { +None + }} + +val jobDescription = jobData.description + .getOrElse(lastStage.flatMap(_.description). + getOrElse(jobData.name)) + +val lastStageName = lastStage.map(_.name).getOrElse(jobData.name) + +val lastStageDescription = lastStage.flatMap(_.description) + .getOrElse(jobData.description + .getOrElse(jobData.name)) --- End diff -- No need to break, but for me, it shows the structure and logic of the code better this way. I've formatted the calculation of the two descriptions symmetric and I'm also happy to remove unnecessary line breaks if you recommend it. Thanks for the review and your comments @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20251#discussion_r161273733 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -429,20 +429,40 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val jobDescription = UIUtils.makeDescription(jobData.description.getOrElse(""), - basePath, plainText = false) + +val lastStage = { + val stageAttempts = jobData.stageIds.flatMap(store.stageData(_)) --- End diff -- stageData has this signature: `def stageData(stageId: Int, details: Boolean = false)` without (_) scalac doesn't use the default value of details and it complains that it can't use `(Int, Boolean)=>...` on a Seq[Int]. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20251#discussion_r161273851 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala --- @@ -429,20 +429,40 @@ private[ui] class JobDataSource( val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val submissionTime = jobData.submissionTime val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown") -val jobDescription = UIUtils.makeDescription(jobData.description.getOrElse(""), - basePath, plainText = false) + +val lastStage = { + val stageAttempts = jobData.stageIds.flatMap(store.stageData(_)) + if (!stageAttempts.isEmpty) { +val lastId = stageAttempts.map(_.stageId).max +stageAttempts.find(_.stageId == lastId) --- End diff -- Indeed, nice catch, thx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20251: [Spark-23051][core] Fix for broken job description in Sp...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20251 Screenshots: [In 2.2](https://issues.apache.org/jira/secure/attachment/12905889/in-2.2.png) [In 2.3 before fix](https://issues.apache.org/jira/secure/attachment/12905887/Spark-23051-before.png) [In 2.3 after the fix](https://issues.apache.org/jira/secure/attachment/12905888/Spark-23051-after.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/20251 [Spark-23051][core] Fix for broken job description in Spark UI ## What changes were proposed in this pull request? In 2.2, Spark UI displayed the stage description if the job description was not set. This functionality was broken, the GUI has shown no description in this case. In addition, the code uses jobName and jobDescription instead of stageName and stageDescription when JobTableRowData is created. In this PR the logic producing values for the job rows was modified to find the latest stage attempt for the job and use that as a fallback if job description was missing. StageName and stageDescription are also set using values from stage and jobName/description is used only as a fallback. ## How was this patch tested? Manual testing of the UI, using the code in the bug report. You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-23051 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20251.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20251 commit 0412c4b959b42cb0b41c669593e689430da59ec2 Author: Sandor Murakozi Date: 2018-01-12T15:10:24Z Fix for SPARK-23051 commit 937e0fd579a7f133f860eb6f572f50ad7d7fdf89 Author: Sandor Murakozi Date: 2018-01-12T15:25:44Z fixed logic determining jobDescription commit df41000d99eef3b609ece4b072c5aee0650c21f1 Author: Sandor Murakozi Date: 2018-01-12T15:51:06Z Formatted code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161009520 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -110,7 +131,13 @@ class TaskSetBlacklistSuite extends SparkFunSuite { .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3) .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2) .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3) -val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) +val clock = new ManualClock + +val attemptId = 0 +val taskSetBlacklist = new TaskSetBlacklist( + listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, clock = clock) + +clock.setTime(0) --- End diff -- You should set the time to a new value before each call of taskSetBlacklist.updateBlacklistForFailedTask to see that the events on the listenerbus has the correct time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161001141 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -211,6 +211,11 @@ private[spark] class AppStatusListener( updateBlackListStatus(event.executorId, true) } + override def onExecutorBlacklistedForStage( +event: SparkListenerExecutorBlacklistedForStage): Unit = { +updateBlackListStatusForStage(event.executorId, event.stageId, event.stageAttemptId) + } + --- End diff -- Consider covering this functionality (updating the status) in AppStatusListenerSuite. We already have a check for blacklisting an executor, we should have the same for a stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161009908 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala --- @@ -157,13 +187,19 @@ class TaskSetBlacklistSuite extends SparkFunSuite { // lead to any node blacklisting val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.BLACKLIST_ENABLED.key, "true") -val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new SystemClock()) +val clock = new ManualClock + +val attemptId = 0 +val taskSetBlacklist = new TaskSetBlacklist( + listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, clock = clock) +clock.setTime(0) taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "1", index = 0, failureReason = "testing") taskSetBlacklist.updateBlacklistForFailedTask( "hostA", exec = "1", index = 1, failureReason = "testing") assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1")) assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA")) + verify(listenerBusMock).post(SparkListenerExecutorBlacklistedForStage(0, "1", 2, 0, attemptId)) taskSetBlacklist.updateBlacklistForFailedTask( --- End diff -- Set time to new value before this call. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20203#discussion_r161005667 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -223,6 +228,15 @@ private[spark] class AppStatusListener( updateNodeBlackList(event.hostId, false) } + def updateBlackListStatusForStage(executorId: String, stageId: Int, stageAttemptId: Int): Unit = { +Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + val now = System.nanoTime() + val esummary = stage.executorSummary(executorId) + esummary.isBlacklisted = true + maybeUpdate(esummary, now) +} + } + --- End diff -- LiveEntities periodically write an immutable view of the entity to the store. LiveExecutor and LiveExecutorStageSummary were modified in this PR to maintain blacklisted status and to write it to the state store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20235#discussion_r160980529 --- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala --- @@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } test("FPGrowth fit and transform with different data types") { -Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt => - val data = dataset.withColumn("items", col("items").cast(ArrayType(dt))) - val model = new FPGrowth().setMinSupport(0.5).fit(data) - val generatedRules = model.setMinConfidence(0.5).associationRules - val expectedRules = spark.createDataFrame(Seq( -(Array("2"), Array("1"), 1.0), -(Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") -.withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) -.withColumn("consequent", col("consequent").cast(ArrayType(dt))) - assert(expectedRules.sort("antecedent").rdd.collect().sameElements( -generatedRules.sort("antecedent").rdd.collect())) - - val transformed = model.transform(data) - val expectedTransformed = spark.createDataFrame(Seq( -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "3"), Array(2)) - )).toDF("id", "items", "prediction") -.withColumn("items", col("items").cast(ArrayType(dt))) -.withColumn("prediction", col("prediction").cast(ArrayType(dt))) - assert(expectedTransformed.collect().toSet.equals( -transformed.collect().toSet)) + class DataTypeWithEncoder[A](val a: DataType) + (implicit val encoder: Encoder[(Int, Array[A], Array[A])]) --- End diff -- Done, thx. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20235#discussion_r160969767 --- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala --- @@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } test("FPGrowth fit and transform with different data types") { -Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach { dt => - val data = dataset.withColumn("items", col("items").cast(ArrayType(dt))) - val model = new FPGrowth().setMinSupport(0.5).fit(data) - val generatedRules = model.setMinConfidence(0.5).associationRules - val expectedRules = spark.createDataFrame(Seq( -(Array("2"), Array("1"), 1.0), -(Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") -.withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) -.withColumn("consequent", col("consequent").cast(ArrayType(dt))) - assert(expectedRules.sort("antecedent").rdd.collect().sameElements( -generatedRules.sort("antecedent").rdd.collect())) - - val transformed = model.transform(data) - val expectedTransformed = spark.createDataFrame(Seq( -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "2"), Array.emptyIntArray), -(0, Array("1", "3"), Array(2)) - )).toDF("id", "items", "prediction") -.withColumn("items", col("items").cast(ArrayType(dt))) -.withColumn("prediction", col("prediction").cast(ArrayType(dt))) - assert(expectedTransformed.collect().toSet.equals( -transformed.collect().toSet)) + class DataTypeWithEncoder[A](val a: DataType) + (implicit val encoder: Encoder[(Int, Array[A], Array[A])]) --- End diff -- This class is needed for two purposes: 1. to connect data types with their corresponding DataType. Note: this information is already available in AtomicType as InternalType, but it's not accessible. Using it from this test doesn't justify making it public. 2. to get the proper encoder to the testTransformer method. As the datatypes are put into an array dt is inferred to be their parent type, and implicit search is able to find the encoders only for concrete types. For a similar reason, we need to use the type of the final encoder. If we have only the encoder for A implicit search will not be able to construct Array[A], as we have implicit encoders for Array[Int], Array[Short]... but not for generic A, having an encoder. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/20235 [Spark-22887][ML][TESTS][WIP] ML test for StructuredStreaming: spark.ml.fpm ## What changes were proposed in this pull request? Converting FPGrowth tests to also check code with structured streaming, using the ML testing infrastructure implemented in SPARK-22882. Note: this is a WIP, test with Array[Byte] is not yet working due to some datatype issues (Array[Byte] vs Binary). ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-22887 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20235.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20235 commit 331129556003bcf6e4bab6559e80e46ac0858706 Author: Sandor Murakozi Date: 2018-01-05T12:41:53Z test 'FPGrowthModel setMinConfidence should affect rules generation and transform' is converted to use testTransformer commit 93aff2c999eee4a88f7f4a3c32d6c7b601a918ac Author: Sandor Murakozi Date: 2018-01-08T13:14:38Z Test 'FPGrowth fit and transform with different data types' works with streaming, except for Byte commit 8b0b00070a21bd47537a7c3ad580e2af38a481bd Author: Sandor Murakozi Date: 2018-01-11T11:28:46Z All tests use testTransformer. Test with Array[Byte] is missing. commit af61845ab6acfa82c4411bce3ab4a20afebd0aa3 Author: Sandor Murakozi Date: 2018-01-11T11:49:27Z Unintentional changes in 93aff2c999 are reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/20045 @gatorsmile @hvanhovell @jiangxb1987, could you have a look, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20046: Adding unit tests for missing aggregate functions
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20046#discussion_r158326073 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala --- @@ -154,6 +154,217 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { Row(2.0d), Row(2.0d))) } + test("corr, covar_pop, stddev_pop functions in specific window") { +val df = Seq( + ("a", "p1", 10.0, 20.0), + ("b", "p1", 20.0, 10.0), + ("c", "p2", 20.0, 20.0), + ("d", "p2", 20.0, 20.0), + ("e", "p3", 0.0, 0.0), + ("f", "p3", 6.0, 12.0), + ("g", "p3", 6.0, 12.0), + ("h", "p3", 8.0, 16.0), + ("i", "p4", 5.0, 5.0)).toDF("key", "partitionId", "value1", "value2") +checkAnswer( + df.select( +$"key", +corr("value1", "value2").over(Window.partitionBy("partitionId") + .orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +covar_pop("value1", "value2") + .over(Window.partitionBy("partitionId") +.orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +var_pop("value1") + .over(Window.partitionBy("partitionId") +.orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +stddev_pop("value1") + .over(Window.partitionBy("partitionId") +.orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +var_pop("value2") + .over(Window.partitionBy("partitionId") +.orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +stddev_pop("value2") + .over(Window.partitionBy("partitionId") +.orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))), + + // As stddev_pop(expr) = sqrt(var_pop(expr)) + // the "stddev_pop" column can be calculated from the "var_pop" column. + // + // As corr(expr1, expr2) = covar_pop(expr1, expr2) / (stddev_pop(expr1) * stddev_pop(expr2)) + // the "corr" column can be calculated from the "covar_pop" and the two "stddev_pop" columns. + Seq( +Row("a", -1.0, -25.0, 25.0, 5.0, 25.0, 5.0), +Row("b", -1.0, -25.0, 25.0, 5.0, 25.0, 5.0), +Row("c", null, 0.0, 0.0, 0.0, 0.0, 0.0), +Row("d", null, 0.0, 0.0, 0.0, 0.0, 0.0), +Row("e", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0), +Row("f", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0), +Row("g", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0), +Row("h", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0), +Row("i", Double.NaN, 0.0, 0.0, 0.0, 0.0, 0.0))) + } + + test("covar_samp, var_samp (variance), stddev_samp (stddev) functions in specific window") { +val df = Seq( + ("a", "p1", 10.0, 20.0), + ("b", "p1", 20.0, 10.0), + ("c", "p2", 20.0, 20.0), + ("d", "p2", 20.0, 20.0), + ("e", "p3", 0.0, 0.0), + ("f", "p3", 6.0, 12.0), + ("g", "p3", 6.0, 12.0), + ("h", "p3", 8.0, 16.0), + ("i", "p4", 5.0, 5.0)).toDF("key", "partitionId", "value1", "value2") +checkAnswer( + df.select( +$"key", +covar_samp("value1", "value2").over(Window.partitionBy("partitionId") + .orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +var_samp("value1").over(Window.partitionBy("partitionId") + .orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +variance("value1").over(Window.partitionBy("partitionId") + .orderBy("key").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)), +stddev_sam
[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20045#discussion_r158304105 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala --- @@ -32,6 +32,217 @@ import org.apache.spark.unsafe.types.CalendarInterval class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ + test("Window partitionBy cardinality, no order by") { +val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 4)).toDF("key", "value") + +checkAnswer( + df.select( +sum("value").over(), +sum("value").over(Window.partitionBy("key")), +sum("value").over(Window.partitionBy("key", "value")), +sum("value").over(Window.partitionBy("value", "key"))), + Row(11, 3, 1, 1) :: Row(11, 3, 2, 2) :: Row(11, 8, 8, 8) :: Row(11, 8, 8, 8) :: Nil) + } + + test("Null value in partition key") { +val df = Seq(("a", 1), ("a", 2), (null, 4), (null, 8)).toDF("key", "value") + +checkAnswer( + df.select( +'value, +sum("value").over(Window.partitionBy("key"))), + Row(1, 3) :: Row(2, 3) :: Row(4, 12) :: Row(8, 12) :: Nil) + } + + test("Same partitionBy multiple times") { +val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 8)).toDF("key", "value") + +checkAnswer( + df.select( +sum("value").over(Window.partitionBy("key", "key"))), + Row(3) :: Row(3) :: Row(12) :: Row(12) :: Nil) + } + + test("Multiple orderBy clauses") { +val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 4)).toDF("k1", "k2", "v") + +checkAnswer( + df.select( +'v, +lead("v", 1).over(Window.orderBy("k1", "k2")), +lead("v", 1).over(Window.orderBy("k2", "k1"))), + Row(1, 2, 4) :: Row(4, 3, 2) :: Row(2, 4, 3) :: Row(3, null, null) :: Nil) + } + + test("Multiple orderBy clauses with desc") { +val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 4)).toDF("k1", "k2", "v") + +checkAnswer( + df.select( +'v, +lead("v", 1).over(Window.orderBy($"k1".desc, $"k2")), +lead("v", 1).over(Window.orderBy($"k1", $"k2".desc)), +lead("v", 1).over(Window.orderBy($"k1".desc, $"k2"))), --- End diff -- It should be desc, desc, thanks for catching it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20045#discussion_r158304013 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala --- @@ -246,13 +246,23 @@ class ExpressionParserSuite extends PlanTest { // Basic window testing. assertEqual("foo(*) over w1", UnresolvedWindowExpression(func, WindowSpecReference("w1"))) assertEqual("foo(*) over ()", windowed()) +assertEqual("foo(*) over (partition by a)", windowed(Seq('a))) assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b))) +assertEqual("foo(*) over (distribute by a)", windowed(Seq('a))) assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b))) +assertEqual("foo(*) over (cluster by a)", windowed(Seq('a))) assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b))) -assertEqual("foo(*) over (order by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc ))) -assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc ))) +assertEqual("foo(*) over (order by a)", windowed(Seq.empty, Seq('a.asc))) +assertEqual("foo(*) over (order by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc))) +assertEqual("foo(*) over (sort by a)", windowed(Seq.empty, Seq('a.asc))) +assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc))) assertEqual("foo(*) over (partition by a, b order by c)", windowed(Seq('a, 'b), Seq('c.asc))) assertEqual("foo(*) over (distribute by a, b sort by c)", windowed(Seq('a, 'b), Seq('c.asc))) --- End diff -- I will fix it, thx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/20045#discussion_r158303978 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala --- @@ -246,13 +246,23 @@ class ExpressionParserSuite extends PlanTest { // Basic window testing. assertEqual("foo(*) over w1", UnresolvedWindowExpression(func, WindowSpecReference("w1"))) assertEqual("foo(*) over ()", windowed()) +assertEqual("foo(*) over (partition by a)", windowed(Seq('a))) assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b))) +assertEqual("foo(*) over (distribute by a)", windowed(Seq('a))) assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b))) +assertEqual("foo(*) over (cluster by a)", windowed(Seq('a))) assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b))) -assertEqual("foo(*) over (order by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc ))) -assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc ))) +assertEqual("foo(*) over (order by a)", windowed(Seq.empty, Seq('a.asc))) +assertEqual("foo(*) over (order by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc))) +assertEqual("foo(*) over (sort by a)", windowed(Seq.empty, Seq('a.asc))) +assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, Seq('a.desc, 'b.asc))) assertEqual("foo(*) over (partition by a, b order by c)", windowed(Seq('a, 'b), Seq('c.asc))) --- End diff -- I will fix it, thx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/20045 [Spark-22360][SQL] Add unit tests for Window Specifications ## What changes were proposed in this pull request? Improve the test coverage of window specifications. New tests cover basic cases more systematically in DataFrameWindowFunctionsSuite: - different partition clauses (none, one, multiple) - different order clauses (none, one, multiple, asc/desc, nulls first/last) - defaults if clauses are missing New tests were added to cover some more complex cases when partitionBy or orderBy uses expressions. ExpressionParserSuite.'window function expressions' was also extended to check parsing of some additional window expressions. ## How was this patch tested? Only new tests were added, automated tests were executed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-22360 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20045.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20045 commit 313aafbebc134ec02110a31fb99bb72c30d61639 Author: Sandor Murakozi Date: 2017-12-21T11:23:12Z New test cases for window specification commit 6c48095358aa77d3dd12c7dbba97ecc31c22604e Author: Sandor Murakozi Date: 2017-12-21T13:05:02Z test for filtering based on window function commit e941aca540c68e74c80d279150c8d8731f2ed332 Author: Sandor Murakozi Date: 2017-12-21T13:47:11Z More test expressions in ExpressionParserSuite.window function expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156609625 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[String]` for Java. + */ +@DeveloperApi +class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean) + extends Param[String](parent, name, doc, isValid) { + + private var options: Option[Array[String]] = None + + def this(parent: Params, name: String, doc: String) = +this(parent, name, doc, ParamValidators.alwaysTrue) + + /** construct a StringParam with limited options (case-insensitive) */ + def this(parent: Params, name: String, doc: String, options: Array[String]) = { +this(parent, name, doc + s" Supported options (case-insensitive): ${options.mkString(", ")}.", + s => options.exists(s.equalsIgnoreCase)) +this.options = Some(options) + } + + private[spark] def getOptions: Option[Array[String]] = options + + /** Creates a param pair with given value (for Java). */ + override def w(value: String): ParamPair[String] = super.w(value) + + override def validate(value: String): Unit = { --- End diff -- should be private[param] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156609525 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[String]` for Java. + */ +@DeveloperApi +class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean) + extends Param[String](parent, name, doc, isValid) { + + private var options: Option[Array[String]] = None + + def this(parent: Params, name: String, doc: String) = +this(parent, name, doc, ParamValidators.alwaysTrue) + + /** construct a StringParam with limited options (case-insensitive) */ + def this(parent: Params, name: String, doc: String, options: Array[String]) = { +this(parent, name, doc + s" Supported options (case-insensitive): ${options.mkString(", ")}.", --- End diff -- I missed one additional line when highlighting, sorry for that :) I meant to test the case-insensitive validation in the next line. Doc could be tested too, I think it's rather a nice to have than a must. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156605360 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[String]` for Java. + */ +@DeveloperApi +class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean) + extends Param[String](parent, name, doc, isValid) { + + private var options: Option[Array[String]] = None --- End diff -- What about this? ``` class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean, options: Option[Array[String]] = None) extends Param[String](parent, name, doc, isValid) { ... def this(parent: Params, name: String, doc: String, options: Array[String]) = { this(parent, name, doc + s" Supported options (case-insensitive): ${options.mkString(", ")}.", s => options.exists(s.equalsIgnoreCase), Some(options)) }``` This solves the options as val problem, but highlights another one: why do we need the possibility to give an explicit isValid? Why not always expect options only? I agree with @attilapiros that these params are enum-like. If so the only reasonable validation is to check if one of the acceptable values are given (ignoring case). I don't remember ever seeing a custom validator doing anything else. Removing these custom validators would decrease complexity and code duplication. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156339663 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[String]` for Java. + */ +@DeveloperApi +class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean) + extends Param[String](parent, name, doc, isValid) { + + private var options: Option[Array[String]] = None + + def this(parent: Params, name: String, doc: String) = +this(parent, name, doc, ParamValidators.alwaysTrue) + + /** construct a StringParam with limited options (case-insensitive) */ + def this(parent: Params, name: String, doc: String, options: Array[String]) = { +this(parent, name, doc + s" Supported options (case-insensitive): ${options.mkString(", ")}.", --- End diff -- Is this tested somewhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r155782239 --- Diff: mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala --- @@ -139,6 +139,17 @@ class ParamsSuite extends SparkFunSuite { } } +{ // StringArrayParam --- End diff -- It should be StringParam --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156086625 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[String]` for Java. + */ +@DeveloperApi +class StringParam(parent: Params, name: String, doc: String, isValid: String => Boolean) + extends Param[String](parent, name, doc, isValid) { + + private var options: Option[Array[String]] = None --- End diff -- It should rather be a val. That way you would not need def getOptions() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19599#discussion_r156339373 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -224,8 +222,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth) instr.logNumFeatures(numFeatures) -if (($(solver) == Auto && - numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal) { +if (($(solver).equalsIgnoreCase(Auto) && numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) --- End diff -- I think using equalsIgnoreCase all over the places is problematic: you need to check and potentially modify code at many places in the code. Everywhere when $() is used on a StringParam you may need this change. It's pretty easy to miss some and that would lead to subtle bugs. A safer approach would be to work with lowercase param values everywhere. You could convert to lowercase in the constructor and require that possible param values are given as lowercase. This way all comparisons, pattern matches, etc. would work just fine. The downside of this approach would be that everywhere where you show these values to the users you would present the lowercase value. It might cause issues e.g. if you log it and some external log parser expects the original (non-lowercase) version. In the very broad sense, it would break compatibility but I guess that would be acceptable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19920: [SPARK-21672][CORE] Remove SHS-specific application / at...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/19920 Thanks for your help @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155852268 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala --- @@ -88,4 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def makePageLink(showIncomplete: Boolean): String = { UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete) } + + private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { --- End diff -- I've checked the status and the history packages and found no suitable object. As it is only one extra copy of a tiny method and it's in test code I don't think it would worth the extra complexity of introducing a new object, so I think it's best to leave it as it is. If we use it more frequently we can extract into a util object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155725705 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -24,27 +24,29 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -class ApplicationInfo private[spark]( -val id: String, -val name: String, -val coresGranted: Option[Int], -val maxCores: Option[Int], -val coresPerExecutor: Option[Int], -val memoryPerExecutorMB: Option[Int], -val attempts: Seq[ApplicationAttemptInfo]) +case class ApplicationInfo private[spark]( +id: String, +name: String, +coresGranted: Option[Int], +maxCores: Option[Int], +coresPerExecutor: Option[Int], +memoryPerExecutorMB: Option[Int], +attempts: Seq[ApplicationAttemptInfo]) { + --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155718836 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala --- @@ -88,4 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def makePageLink(showIncomplete: Boolean): String = { UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete) } + + private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { --- End diff -- My first commit had this method in ApplicationInfo, called completed @vanzin: `+case class ApplicationInfo private[spark]( ... +def completed: Boolean = {` Is this used anywhere? I answered we have it in HistoryPage (forgot to mention usage in the test) @vanzin: Can we move this logic there instead? That avoids adding a new visible property in a public object. I moved it, noting the code duplication in a comment on PR. The question is which is better: a bit of duplication (used 3 and defined 2 times, once in application and once in test code) or extending the public API? My gut feeling is that the small duplication is preferable in this case, but I'm a newbie in Spark development so I'm easy to convince that we have different preferences :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155657473 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -17,16 +17,15 @@ package org.apache.spark.status -import java.io.File -import java.util.{Arrays, List => JList} +import java.util.{List => JList} import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.{Distribution} --- End diff -- restored original --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155657274 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala --- @@ -30,7 +31,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(UIUtils.stripXSS(request.getParameter("showIncomplete"))).getOrElse("false").toBoolean -val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete) +val allAppsSize = parent.getApplicationList(). --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155657306 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala --- @@ -88,4 +90,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def makePageLink(showIncomplete: Boolean): String = { UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete) } + + private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = { +appInfo.attempts.nonEmpty && appInfo.attempts.head.completed + --- End diff -- nuked --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19906: [SPARK-22516][SQL] Bump up Univocity version to 2.5.9
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/19906 Thanks for your help and reviews @HyukjinKwon, @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155632345 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -24,27 +24,32 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -class ApplicationInfo private[spark]( -val id: String, -val name: String, -val coresGranted: Option[Int], -val maxCores: Option[Int], -val coresPerExecutor: Option[Int], -val memoryPerExecutorMB: Option[Int], -val attempts: Seq[ApplicationAttemptInfo]) +case class ApplicationInfo private[spark]( +id: String, +name: String, +coresGranted: Option[Int], +maxCores: Option[Int], +coresPerExecutor: Option[Int], +memoryPerExecutorMB: Option[Int], +attempts: Seq[ApplicationAttemptInfo]) { + +def completed: Boolean = { --- End diff -- Moved, also had to add it to `HistoryServerSuite`. A bit of code duplication, but indeed better than increasing API surface area. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155631788 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, ApplicationInfo} --- End diff -- fixed in addition removed some unused imports --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155631687 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -29,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo} --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155631377 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala --- @@ -188,11 +188,11 @@ class HistoryServer( } def getApplicationInfoList: Iterator[ApplicationInfo] = { - getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) +getApplicationList() } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { - provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + provider.getApplicationInfo(appId) --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155611738 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -24,27 +24,32 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -class ApplicationInfo private[spark]( -val id: String, -val name: String, -val coresGranted: Option[Int], -val maxCores: Option[Int], -val coresPerExecutor: Option[Int], -val memoryPerExecutorMB: Option[Int], -val attempts: Seq[ApplicationAttemptInfo]) +case class ApplicationInfo private[spark]( --- End diff -- We check equality of ApplicationInfos during testing. I think converting them to case classes introduce less complexity than adding a proper equals method (ditto for ApplicationAttemptInfo below). In addition we copy ApplicationInfos in FsHistoryProvider:819, it's also simpler with case classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155610889 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -24,27 +24,32 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus -class ApplicationInfo private[spark]( -val id: String, -val name: String, -val coresGranted: Option[Int], -val maxCores: Option[Int], -val coresPerExecutor: Option[Int], -val memoryPerExecutorMB: Option[Int], -val attempts: Seq[ApplicationAttemptInfo]) +case class ApplicationInfo private[spark]( +id: String, +name: String, +coresGranted: Option[Int], +maxCores: Option[Int], +coresPerExecutor: Option[Int], +memoryPerExecutorMB: Option[Int], +attempts: Seq[ApplicationAttemptInfo]) { + +def completed: Boolean = { --- End diff -- org.apache.spark.deploy.history.HistoryPage:33 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
Github user smurakozi commented on a diff in the pull request: https://github.com/apache/spark/pull/19920#discussion_r155609528 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -29,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo} --- End diff -- There is no ambiguity between ApplicationAttemptInfo and ApplicationAttempt classes in different versions anymore, so I thought the v1-s can be removed. I can undo that if they are still useful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19920: [SPARK-21672][CORE] Remove SHS-specific application / at...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/19920 PR description was cleaned, thanks for catching it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/19920 [SPARK-21672][CORE] Remove SHS-specific application / attempt data ⦠â¦structures ## What changes were proposed in this pull request? In general, the SHS pages now use the public API types to represent applications. Some internal code paths still used its own view of what applications and attempts look like (`ApplicationHistoryInfo` and `ApplicationAttemptInfo`), declared in ApplicationHistoryProvider.scala. This pull request removes these classes and updates the rest of the code to use `status.api.v1.ApplicationInfo` and `status.api.v1.ApplicationAttemptInfo` instead. Furthermore `status.api.v1.ApplicationInfo` and `status.api.v1.ApplicationAttemptInfo` were changed to case class to - facilitate copying instances - equality checking in test code - nicer toString() To simplify the code a bit `v1.` prefixes were also removed from occurrences of v1.ApplicationInfo and v1.ApplicationAttemptInfo as there is no more ambiguity between classes in history and status.api.v1. ## How was this patch tested? By running existing automated tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-21672 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19920.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19920 commit a457acf3faeba94bb1473d2496b8a7126e4f8b1d Author: Sandor Murakozi Date: 2017-12-06T16:34:28Z [SPARK-21672][CORE] Remove SHS-specific application / attempt data structures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19906: [SPARK-22516][SQL] Bump up Univocity version to 2...
GitHub user smurakozi opened a pull request: https://github.com/apache/spark/pull/19906 [SPARK-22516][SQL] Bump up Univocity version to 2.5.9 ## What changes were proposed in this pull request? There was a bug in Univocity Parser that causes the issue in SPARK-22516. This was fixed by upgrading from 2.5.4 to 2.5.9 version of the library : **Executing** ``` spark.read.option("header","true").option("inferSchema", "true").option("multiLine", "true").option("comment", "g").csv("test_file_without_eof_char.csv").show() ``` **Before** ``` ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6) com.univocity.parsers.common.TextParsingException: java.lang.IllegalArgumentException - Unable to skip 1 lines from line 2. End of input reached ... Internal state when error was thrown: line=3, column=0, record=2, charIndex=31 at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:339) at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:475) at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:281) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ``` **After** ``` +---+---+ |column1|column2| +---+---+ |abc|def| +---+---+ ``` ## How was this patch tested? The already existing `CSVSuite.commented lines in CSV data` test was extended to parse the file also in multiline mode. The test input file was modified to also include a comment in the last line. You can merge this pull request into a Git repository by running: $ git pull https://github.com/smurakozi/spark SPARK-22516 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19906 commit 8bc6a9ce9f6eeb854261d26dabaf04052eb8b5b2 Author: smurakozi Date: 2017-11-27T08:30:25Z [SPARK-22516][SQL] Bump up Univocity version to 2.5.9 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...
Github user smurakozi commented on the issue: https://github.com/apache/spark/pull/19893 Logging the leaked threads in a more grep friendly format would be nice, you could easily create a thread leak report. It would be also nice to see the leaks on the console. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org