[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...

2018-04-09 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20319
  
@WeichenXu123 thanks for checking it out. I've resolved the conflicts, 
build is green.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...

2018-04-09 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20235#discussion_r180124798
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala 
---
@@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defaul
   }
 
   test("FPGrowth fit and transform with different data types") {
-Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach 
{ dt =>
-  val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt)))
-  val model = new FPGrowth().setMinSupport(0.5).fit(data)
-  val generatedRules = model.setMinConfidence(0.5).associationRules
-  val expectedRules = spark.createDataFrame(Seq(
-(Array("2"), Array("1"), 1.0),
-(Array("1"), Array("2"), 0.75)
-  )).toDF("antecedent", "consequent", "confidence")
-.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
-.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
-  assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
-generatedRules.sort("antecedent").rdd.collect()))
-
-  val transformed = model.transform(data)
-  val expectedTransformed = spark.createDataFrame(Seq(
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "3"), Array(2))
-  )).toDF("id", "items", "prediction")
-.withColumn("items", col("items").cast(ArrayType(dt)))
-.withColumn("prediction", col("prediction").cast(ArrayType(dt)))
-  assert(expectedTransformed.collect().toSet.equals(
-transformed.collect().toSet))
+  class DataTypeWithEncoder[A](val a: DataType)
+  (implicit val encoder: Encoder[(Int, 
Array[A], Array[A])])
+
+  Array(
+new DataTypeWithEncoder[Int](IntegerType),
+new DataTypeWithEncoder[String](StringType),
+new DataTypeWithEncoder[Short](ShortType),
+new DataTypeWithEncoder[Long](LongType)
+// , new DataTypeWithEncoder[Byte](ByteType)
+// TODO: using ByteType produces error, as Array[Byte] is handled 
as Binary
+// cannot resolve 'CAST(`items` AS BINARY)' due to data type 
mismatch:
+// cannot cast array to binary;
+  ).foreach { dt => {
+val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt.a)))
+val model = new FPGrowth().setMinSupport(0.5).fit(data)
+val generatedRules = model.setMinConfidence(0.5).associationRules
+val expectedRules = Seq(
+  (Array("2"), Array("1"), 1.0),
+  (Array("1"), Array("2"), 0.75)
+).toDF("antecedent", "consequent", "confidence")
+  .withColumn("antecedent", 
col("antecedent").cast(ArrayType(dt.a)))
+  .withColumn("consequent", 
col("consequent").cast(ArrayType(dt.a)))
+assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
+  generatedRules.sort("antecedent").rdd.collect()))
+
+val expectedTransformed = Seq(
+  (0, Array("1", "2"), Array.emptyIntArray),
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...

2018-04-09 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20235
  
@WeichenXu123  Executing `test("FPGrowth fit and transform with different 
data types")` with Byte data type produces an 
`org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`items` AS 
BINARY)' due to data type mismatch: cannot cast array to binary`
I don't remember all the details, but somewhere (catalyst?) the Array[Byte] 
has special treatment as Binary. When `testTransformerOnStreamData` tries to 
collect the data from the dataframe to test it as a stream this special casing 
causes the above mentioned exception.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-04-09 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
ping @jiangxb1987


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-03-14 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
ping @jiangxb1987


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-03-05 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r172122977
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
 ---
@@ -75,11 +71,9 @@ class MultilayerPerceptronClassifierSuite
   .setMaxIter(100)
   .setSolver("l-bfgs")
 val model = trainer.fit(dataset)
-val result = model.transform(dataset)
 MLTestingUtils.checkCopyAndUids(trainer, model)
-val predictionAndLabels = result.select("prediction", 
"label").collect()
-predictionAndLabels.foreach { case Row(p: Double, l: Double) =>
-  assert(p == l)
+testTransformer[(Vector, Double)](dataset.toDF(), model, "prediction", 
"label") {
--- End diff --

` @transient var dataset: Dataset[_] = _`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-03-05 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
I think I've addressed all your points @jiangxb1987. 
Do you think I need any more changes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166883609
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
+
+public static void write004(Writer writer,
--- End diff --

No doc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166883372
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway";>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+pub

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166886232
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
+
+public static void write004(Writer writer,
+Enumeration 
mfs)throws IOException {
+write004(writer, mfs, null);
+}
+
+/**
+ * Write out the text version 0.0.4 of the given MetricFamilySamples.
+ */
+public static void write004(Writer 
writer,Enumeration mfs,
+String timestamp) throws IOException {
+/* See http://prometheus.io/docs/instrumenting/exposition_formats/
+ * for the output format specification. */
+while(mfs.hasMoreElements()) {
+Collector.MetricFamilySamples metricFamilySamples = 
mfs.nextElement();
--- End diff --

I think `for(Collector.MetricFamilySamples s: Collections.list(mfs)) {`  
would be nicer.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166889139
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
--- End diff --

Usage of this variable is questionable for a couple of reasons:
- it just keeps growing, it's never cleared or re-initialized. As a 
consequence from the second call of write it will have invalid content + it 
acts as a memory leak.
- its usage pattern 
(`writer.write(blah);appendToJsonMessageLogBuilder("blah")`) is pretty verbose, 
it should be factored out. 
- it's not thread safe (and it's not documented)
- I don't think accessing it as a static member everywhere is a good 
design. It should either
  - be passed around as method parameter
  - or changed to an instance method. The static write004 could instantiate 
a new `TextFormatWithTimestamp` and call write on that. 




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166896270
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway";>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+pub

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166892802
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+  extends Sink with Logging {
+
+  protected class Reporter(registry: MetricRegistry)
+extends ScheduledReporter(
+  registry,
+  "prometheus-reporter",
+  MetricFilter.ALL,
+  TimeUnit.SECONDS,
+  TimeUnit.MILLISECONDS) {
+
+val defaultSparkConf: SparkConf = new SparkConf(true)
+
+override def report(
+ gauges: util.SortedMap[String, Gauge[_]],
+ counters: util.SortedMap[String, Counter],
+ histograms: util.SortedMap[String, Histogram],
+ meters: util.SortedMap[String, Meter],
+ timers: util.SortedMap[String, Timer]): Unit = {
+
+  // SparkEnv may become available only after metrics sink creation 
thus retrieving
+  // SparkConf from spark env here and not during the 
creation/initialisation of PrometheusSink.
+  val sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
+
+  val metricsNamespace: Option[String] = 
sparkConf.get(METRICS_NAMESPACE)
+  val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
+  val executorId: Option[String] = 
sparkConf.getOption("spark.executor.id")
+
+  logInfo(s"metricsNamespace=$metricsNamespace, 
sparkAppId=$sparkAppId, " +
+s"executorId=$executorId")
+
+  val role: String = (sparkAppId, executorId) match {
+case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
+case (Some(_), Some(_)) => "executor"
+case _ => "shuffle"
+  }
+
+  val job: String = role match {
+case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+case _ => metricsNamespace.getOrElse("shuffle")
+  }
+  logInfo(s"role=$role, job=$job")
+
+  val groupingKey: Map[String, String] = (role, executorId) match {
+case ("driver", _) => Map("role" -> role)
+case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
+case _ => Map("role" -> role)
+  }
+
+
--- End diff --

Nit: extra line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166881873
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway";>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+pub

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166883368
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
--- End diff --

No doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166886527
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
+
+public static void write004(Writer writer,
+Enumeration 
mfs)throws IOException {
+write004(writer, mfs, null);
+}
+
+/**
+ * Write out the text version 0.0.4 of the given MetricFamilySamples.
+ */
+public static void write004(Writer 
writer,Enumeration mfs,
+String timestamp) throws IOException {
+/* See http://prometheus.io/docs/instrumenting/exposition_formats/
+ * for the output format specification. */
+while(mfs.hasMoreElements()) {
+Collector.MetricFamilySamples metricFamilySamples = 
mfs.nextElement();
--- End diff --

Also, method body is not indented well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166896081
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway";>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+pub

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166886839
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
+
+public static void write004(Writer writer,
+Enumeration 
mfs)throws IOException {
+write004(writer, mfs, null);
+}
+
+/**
+ * Write out the text version 0.0.4 of the given MetricFamilySamples.
+ */
+public static void write004(Writer 
writer,Enumeration mfs,
+String timestamp) throws IOException {
+/* See http://prometheus.io/docs/instrumenting/exposition_formats/
+ * for the output format specification. */
+while(mfs.hasMoreElements()) {
+Collector.MetricFamilySamples metricFamilySamples = 
mfs.nextElement();
+
+logger.debug("Metrics data");
+logger.debug(metricFamilySamples.toString());
+logger.debug("Logging metrics as a json format:");
+
+
+writer.write("# HELP ");
+appendToJsonMessageLogBuilder("# HELP ");
+writer.write(metricFamilySamples.name);
+appendToJsonMessageLogBuilder(metricFamilySamples.name);
+writer.write(' ');
+appendToJsonMessageLogBuilder(' ');
+writeEscapedHelp(writer, metricFamilySamples.help);
+writer.write('\n');
+appendToJsonMessageLogBuilder('\n');
+
+writer.write("# TYPE ");
+appendToJsonMessageLogBuilder("# TYPE ");
+writer.write(metricFamilySamples.name);
+appendToJsonMessageLogBuilder(metricFamilySamples.name);
+writer.write(' ');
+appendToJsonMessageLogBuilder(' ');
+writer.write(typeString(metricFamilySamples.type));
+
appendToJsonMessageLogBuilder(typeString(metricFamilySamples.type));
+writer.write('\n');
+appendToJsonMessageLogBuilder('\n');
+
+for (Collector.MetricFamilySamples.Sample sample: 
metricFamilySamples.samples) {
+writer.write(sample.name);
+appendToJsonMessageLogBuilder(sample.name);
+if (sample.labelNames.size() > 0) {
+writer.write('{');
+appendToJsonMessageLogBuilder('{');
+for (int i = 0; i < sample.labelNames.size(); ++i) {
+writer.write(sample.labelNames.get(i));
+
appendToJsonMessageLogBuilder(sample.labelNames.get(i));
+writer.write("=\"");
+appendToJsonMessageLogBuilder("=\"");
+writeEscapedLabelValue(writer, 
sample.labelValues.get(i));
+writer.write("\",");
+appendToJsonMessageLogBuilder("\",");
+}
+writer.write('}');
+appendToJsonMessageLogBuilder('}');
+}
+writer.write(' ');
+appendToJsonMessageLogBuilder(' ');
+writer.write(Collector.doubleToGoString(sample.value));
+
appendToJsonMes

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166894472
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+  extends Sink with Logging {
+
+  protected class Reporter(registry: MetricRegistry)
+extends ScheduledReporter(
+  registry,
+  "prometheus-reporter",
+  MetricFilter.ALL,
+  TimeUnit.SECONDS,
+  TimeUnit.MILLISECONDS) {
+
+val defaultSparkConf: SparkConf = new SparkConf(true)
+
+override def report(
+ gauges: util.SortedMap[String, Gauge[_]],
+ counters: util.SortedMap[String, Counter],
+ histograms: util.SortedMap[String, Histogram],
+ meters: util.SortedMap[String, Meter],
+ timers: util.SortedMap[String, Timer]): Unit = {
+
+  // SparkEnv may become available only after metrics sink creation 
thus retrieving
+  // SparkConf from spark env here and not during the 
creation/initialisation of PrometheusSink.
+  val sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
+
+  val metricsNamespace: Option[String] = 
sparkConf.get(METRICS_NAMESPACE)
+  val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
+  val executorId: Option[String] = 
sparkConf.getOption("spark.executor.id")
+
+  logInfo(s"metricsNamespace=$metricsNamespace, 
sparkAppId=$sparkAppId, " +
+s"executorId=$executorId")
+
+  val role: String = (sparkAppId, executorId) match {
+case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
+case (Some(_), Some(_)) => "executor"
+case _ => "shuffle"
+  }
+
+  val job: String = role match {
+case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+case _ => metricsNamespace.getOrElse("shuffle")
+  }
+  logInfo(s"role=$role, job=$job")
+
+  val groupingKey: Map[String, String] = (role, executorId) match {
+case ("driver", _) => Map("role" -> role)
+case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
+case _ => Map("role" -> role)
+  }
+
+
+  pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
+s"${System.currentTimeMillis}")
+
+}
+
+  }
+
+  val DEFAULT_PUSH_PERIOD: Int = 10
+  val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS
+  val DEFAULT_PUSHGATEWAY_ADDRE

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r166894977
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
--- End diff --

`securityMgr` is never used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-02 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165745445
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -599,8 +598,11 @@ class ALSSuite
   (ex, act) =>
 ex.userFactors.first().getSeq[Float](1) === 
act.userFactors.first.getSeq[Float](1)
 } { (ex, act, _) =>
-  ex.transform(_: 
DataFrame).select("prediction").first.getDouble(0) ~==
-act.transform(_: 
DataFrame).select("prediction").first.getDouble(0) absTol 1e-6
+  testTransformerByGlobalCheckFunc[Float](_: DataFrame, act, 
"prediction") {
+case actRows: Seq[Row] =>
+  ex.transform(_: 
DataFrame).select("prediction").first.getDouble(0) ~==
+actRows(0).getDouble(0) absTol 1e-6
+  }
--- End diff --

I think this code does not check anything. 
`testTransformerByGlobalCheckFunc[Float](_: DataFrame, act, "prediction")` 
is just a partial application of `testTransformerByGlobalCheckFunc`. 
However,  `checkNumericTypesALS` expects `check2: (ALSModel, ALSModel, 
DataFrame) => Unit`. It's happy to call the provided function, discard the 
partially applied function and use `()` instead, so it will typecheck.
As a consequence, the function doing the assert is never called, so the 
`~===` assertion never happens. You can check it say by asking for the 100th 
column of the first row - it will not produce an error.

This problem is not a result of your change, the original code had the same 
issue.

It could probably be simplified a bit but I think the original intent was 
to do a check like this:

```
{ (ex, act, df) =>
  ex.transform(df).selectExpr("cast(prediction as 
double)").first.getDouble(0) ~==
act.transform(df).selectExpr("cast(prediction as 
double)").first.getDouble(0) absTol
  1e-6
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165304423
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -566,6 +565,7 @@ class ALSSuite
   test("read/write") {
 val spark = this.spark
 import spark.implicits._
+
--- End diff --

nit: new line is not needed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165308129
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -628,18 +635,24 @@ class ALSSuite
 }
 withClue("transform should fail when ids exceed integer range. ") {
   val model = als.fit(df)
-  assert(intercept[SparkException] {
-model.transform(df.select(df("user_big").as("user"), 
df("item"))).first
-  }.getMessage.contains(msg))
-  assert(intercept[SparkException] {
-model.transform(df.select(df("user_small").as("user"), 
df("item"))).first
-  }.getMessage.contains(msg))
-  assert(intercept[SparkException] {
-model.transform(df.select(df("item_big").as("item"), 
df("user"))).first
-  }.getMessage.contains(msg))
-  assert(intercept[SparkException] {
-model.transform(df.select(df("item_small").as("item"), 
df("user"))).first
-  }.getMessage.contains(msg))
+  def testTransformIdExceedsIntRange[A : Encoder](dataFrame: 
DataFrame): Unit = {
+assert(intercept[SparkException] {
+  model.transform(dataFrame).first
+}.getMessage.contains(msg))
+assert(intercept[StreamingQueryException] {
+  testTransformer[A](dataFrame, model, "prediction") {
+case _ =>
--- End diff --

No need for a partial function here, you can simplify it to `{ _ => }`. 
I would also add a small comment to make it explicit that we intentionally 
do not check anything.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165305989
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -599,8 +599,15 @@ class ALSSuite
   (ex, act) =>
 ex.userFactors.first().getSeq[Float](1) === 
act.userFactors.first.getSeq[Float](1)
 } { (ex, act, _) =>
-  ex.transform(_: 
DataFrame).select("prediction").first.getDouble(0) ~==
-act.transform(_: 
DataFrame).select("prediction").first.getDouble(0) absTol 1e-6
+  testTransformerByGlobalCheckFunc[Float](_: DataFrame, ex, 
"prediction") {
+case exRows: Seq[Row] =>
--- End diff --

I think it's ok to keep ex.transform here. This way the code will be a bit 
simpler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165312057
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -662,28 +676,32 @@ class ALSSuite
 val knownItem = data.select(max("item")).as[Int].first()
 val unknownItem = knownItem + 20
 val test = Seq(
-  (unknownUser, unknownItem),
-  (knownUser, unknownItem),
-  (unknownUser, knownItem),
-  (knownUser, knownItem)
-).toDF("user", "item")
+  (unknownUser, unknownItem, true),
+  (knownUser, unknownItem, true),
+  (unknownUser, knownItem, true),
+  (knownUser, knownItem, false)
+).toDF("user", "item", "expectedIsNaN")
 
 val als = new ALS().setMaxIter(1).setRank(1)
 // default is 'nan'
 val defaultModel = als.fit(data)
-val defaultPredictions = 
defaultModel.transform(test).select("prediction").as[Float].collect()
-assert(defaultPredictions.length == 4)
-assert(defaultPredictions.slice(0, 3).forall(_.isNaN))
-assert(!defaultPredictions.last.isNaN)
+var defaultPredictionNotNaN = Float.NaN
--- End diff --

I would get rid of this variable. 
In `testTransformer` it just adds overhead, 
`assert(!defaultPredictionNotNaN.isNaN)` asserts something that was already 
checked in testTransformer, so it's only use is in 
`testTransformerByGlobalCheckFunc`.
Producing it is a bit convoluted, it's not easy to understand why it's 
needed. 
I would make it clearer by doing a plain old transform using the `test` DF 
(or a smaller one containing only the knownUser, knownItem pair) and selecting 
the value.
An alternative solution could be to use real expected values in the `test` 
DF instead of "isNan" flags. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165312157
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -693,7 +711,9 @@ class ALSSuite
 val data = ratings.toDF
 val model = new ALS().fit(data)
 Seq("nan", "NaN", "Nan", "drop", "DROP", "Drop").foreach { s =>
-  model.setColdStartStrategy(s).transform(data)
+  testTransformer[Rating[Int]](data, model.setColdStartStrategy(s), 
"prediction") {
+case _ =>
--- End diff --

Just like above, no need for partial function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20362: [Spark-22886][ML][TESTS] ML test for structured s...

2018-02-01 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20362#discussion_r165308205
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala ---
@@ -653,6 +666,7 @@ class ALSSuite
   test("ALS cold start user/item prediction strategy") {
 val spark = this.spark
 import spark.implicits._
+
--- End diff --

nit: no need for empty line here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-01-29 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
Do you think I need to cover any other cases, @jiangxb1987 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...

2018-01-29 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20235#discussion_r164427189
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala 
---
@@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defaul
   }
 
   test("FPGrowth fit and transform with different data types") {
-Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach 
{ dt =>
-  val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt)))
-  val model = new FPGrowth().setMinSupport(0.5).fit(data)
-  val generatedRules = model.setMinConfidence(0.5).associationRules
-  val expectedRules = spark.createDataFrame(Seq(
-(Array("2"), Array("1"), 1.0),
-(Array("1"), Array("2"), 0.75)
-  )).toDF("antecedent", "consequent", "confidence")
-.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
-.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
-  assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
-generatedRules.sort("antecedent").rdd.collect()))
-
-  val transformed = model.transform(data)
-  val expectedTransformed = spark.createDataFrame(Seq(
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "3"), Array(2))
-  )).toDF("id", "items", "prediction")
-.withColumn("items", col("items").cast(ArrayType(dt)))
-.withColumn("prediction", col("prediction").cast(ArrayType(dt)))
-  assert(expectedTransformed.collect().toSet.equals(
-transformed.collect().toSet))
+  class DataTypeWithEncoder[A](val a: DataType)
+  (implicit val encoder: Encoder[(Int, 
Array[A], Array[A])])
+
+  Array(
+new DataTypeWithEncoder[Int](IntegerType),
+new DataTypeWithEncoder[String](StringType),
+new DataTypeWithEncoder[Short](ShortType),
+new DataTypeWithEncoder[Long](LongType)
+// , new DataTypeWithEncoder[Byte](ByteType)
+// TODO: using ByteType produces error, as Array[Byte] is handled 
as Binary
+// cannot resolve 'CAST(`items` AS BINARY)' due to data type 
mismatch:
+// cannot cast array to binary;
+  ).foreach { dt => {
+val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt.a)))
+val model = new FPGrowth().setMinSupport(0.5).fit(data)
+val generatedRules = model.setMinConfidence(0.5).associationRules
+val expectedRules = Seq(
+  (Array("2"), Array("1"), 1.0),
+  (Array("1"), Array("2"), 0.75)
+).toDF("antecedent", "consequent", "confidence")
+  .withColumn("antecedent", 
col("antecedent").cast(ArrayType(dt.a)))
+  .withColumn("consequent", 
col("consequent").cast(ArrayType(dt.a)))
+assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
+  generatedRules.sort("antecedent").rdd.collect()))
+
+val expectedTransformed = Seq(
+  (0, Array("1", "2"), Array.emptyIntArray),
+  (0, Array("1", "2"), Array.emptyIntArray),
+  (0, Array("1", "2"), Array.emptyIntArray),
+  (0, Array("1", "3"), Array(2))
+).toDF("id", "items", "expected")
+  .withColumn("items", col("items").cast(ArrayType(dt.a)))
+  .withColumn("expected", col("expected").cast(ArrayType(dt.a)))
+
+testTransformer(expectedTransformed, model,
+  "expected", "prediction") {
+  case Row(expected, prediction) => assert(expected === prediction,
+s"Expected $expected but found $prediction for data type $dt")
+}(dt.encoder)
+  }
 }
   }
 
   test("FPGrowth getFreqItems") {
 val model = new FPGrowth().setMinSupport(0.7).fit(dataset)
-val expectedFreq = spark.createDataFrame(Seq(
+val expectedFreq = Seq(
   (Ar

[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...

2018-01-22 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20330
  
Thanks for your help @vanzin, @gengliangwang, @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20046: [SPARK-22362][SQL] Add unit test for Window Aggregate Fu...

2018-01-22 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20046
  
@jiangxb1987 how does your request to cover the sql interface relates to 
SPARK-23160? 
I assume it is to be covered in that issue, is that correct?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...

2018-01-20 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20330#discussion_r162792383
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -1002,4 +1000,12 @@ private object ApiHelper {
 }
   }
 
+  def lastStageNameAndDescription(store: AppStatusStore, job: JobData): 
(String, String) = {
+store.asOption(store.lastStageAttempt(job.stageIds.max)) match {
+  case Some(lastStageAttempt) =>
+(lastStageAttempt.name, 
lastStageAttempt.description.getOrElse(job.name))
+  case None => ("", "")
--- End diff --

Fixed, thanks for catching.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...

2018-01-19 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20330#discussion_r162759866
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -427,23 +435,21 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
-val lastStageDescription = lastStageAttempt.description.getOrElse("")
+val (lastStageName, lastStageDescription) = 
lastStageNameAndDescription(store, jobData)
 
-val formattedJobDescription =
-  UIUtils.makeDescription(lastStageDescription, basePath, plainText = 
false)
+val jobDescription = UIUtils.makeDescription(lastStageDescription, 
basePath, plainText = false)
--- End diff --

I've moved this logic to `lastStageNameAndDescription`, so it's uniform.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20319: [SPARK-22884][ML][TESTS] ML test for StructuredStreaming...

2018-01-19 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20319
  
@jkbradley could you check out this change, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20235: [Spark-22887][ML][TESTS][WIP] ML test for StructuredStre...

2018-01-19 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20235
  
@jkbradley could you check out this change, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...

2018-01-19 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20330#discussion_r162712767
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -427,23 +435,21 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
-val lastStageDescription = lastStageAttempt.description.getOrElse("")
+val (lastStageName, lastStageDescription) = 
lastStageNameAndDescription(store, jobData)
 
-val formattedJobDescription =
-  UIUtils.makeDescription(lastStageDescription, basePath, plainText = 
false)
+val jobDescription = UIUtils.makeDescription(lastStageDescription, 
basePath, plainText = false)
--- End diff --

`lastStageDescription` may be empty, but it will not cause problems, 
`makeDescription` will handle it properly, just like in the version before 
lastStageAttempt was used:
```
  val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""), 
  basePath, plainText = false)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...

2018-01-19 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20330#discussion_r162622469
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -65,10 +68,13 @@ private[ui] class AllJobsPage(parent: JobsTab, store: 
AppStatusStore) extends We
 }.map { job =>
   val jobId = job.jobId
   val status = job.status
-  val jobDescription = 
store.lastStageAttempt(job.stageIds.max).description
-  val displayJobDescription = jobDescription
-.map(UIUtils.makeDescription(_, "", plainText = true).text)
-.getOrElse("")
+  val (_, lastStageDescription) = lastStageNameAndDescription(store, 
job)
+  val displayJobDescription =
+if (lastStageDescription.isEmpty) {
+  job.name
--- End diff --

Using job.name instead of "" to behave more like the pre-2.3 version:

https://github.com/smurakozi/spark/blob/772e4648d95bda3353723337723543c741ea8476/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala#L70


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...

2018-01-19 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20330
  
cc @jiangxb1987, @srowen, @vanzin  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20330: [SPARK-23121][core] Fix for ui becoming unaccessible for...

2018-01-19 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20330
  
@guoxiaolongzte could you please check if this change fixes the issue you 
have observed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20330: [SPARK-23121][core] Fix for ui becoming unaccessi...

2018-01-19 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/20330

[SPARK-23121][core] Fix for ui becoming unaccessible for long running 
streaming apps 

## What changes were proposed in this pull request?

The allJobs and the job pages attempt to use stage attempt and DAG 
visualization from the store, but for long running jobs they are not guaranteed 
to be retained, leading to exceptions when these pages are rendered.

To fix it `store.lastStageAttempt(stageId)` and 
`store.operationGraphForJob(jobId)` are wrapped in `store.asOption` and default 
values are used if the info is missing.

## How was this patch tested?

Manual testing of the UI, also using the test command reported in 
SPARK-23121:

./bin/spark-submit --class 
org.apache.spark.examples.streaming.HdfsWordCount 
./examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-23121

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20330


commit 94d50b42d6bf233afd398049c95386920c21c252
Author: Sandor Murakozi 
Date:   2018-01-19T10:59:36Z

Fixed issue caused by the store cleaning up old stages

commit d60ae4f39337b91118324064c6a3dc58a3fc2832
Author: Sandor Murakozi 
Date:   2018-01-19T11:33:27Z

JobPage doesn't break if operationGraphForJob is not in the store for a 
jobid

commit 832378d25245126c285e794fadcaea019b70a78a
Author: Sandor Murakozi 
Date:   2018-01-19T11:34:59Z

lastStageNameAndDescription uses store.lastStageAttempt

commit 6525ef4eda0bf65bbbcb842495341afc8c5971ad
Author: Sandor Murakozi 
Date:   2018-01-19T12:15:33Z

Changed message in case of missing DAG visualization info




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20287: [SPARK-23121][WEB-UI] When the Spark Streaming ap...

2018-01-19 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20287#discussion_r162566289
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -427,17 +430,24 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val lastStageAttempt = store.lastStageAttempt(jobData.stageIds.max)
-val lastStageDescription = lastStageAttempt.description.getOrElse("")
-
+var lastStageDescription = ""
--- End diff --

Instead of catching the exception the logic should be modified to be 
prepared for a missing stageAttempt. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20287: [SPARK-23121][WEB-UI] When the Spark Streaming ap...

2018-01-19 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20287#discussion_r162566562
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---
@@ -335,9 +335,12 @@ private[ui] class JobPage(parent: JobsTab, store: 
AppStatusStore) extends WebUIP
 
 content ++= makeTimeline(activeStages ++ completedStages ++ 
failedStages,
   store.executorList(false), appStartTime)
-
-content ++= UIUtils.showDagVizForJob(
-  jobId, store.operationGraphForJob(jobId))
+try {
+  content ++= UIUtils.showDagVizForJob(
+jobId, store.operationGraphForJob(jobId))
+} catch {
+  case e => None
+}
--- End diff --

Same here. We should avoid the situation when the exception is thrown. 
Catching the exception and doing nothing just hides the problems.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20319: [SPARK-22884][ML][TESTS] ML test for StructuredSt...

2018-01-18 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20319#discussion_r162460705
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/Encoders.scala ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+
+private[clustering] object Encoders {
+  implicit val vectorEncoder = ExpressionEncoder[Vector]()
--- End diff --

Is there a better solution to provide an implicit Vector encoder for 
testTransformer?
Is it ok here, or is there a better place for it?
e.g. `org.apache.spark.mllib.util.MLlibTestSparkContext.testImplicits`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20319: [SPARK-22884][ML][TESTS] ML test for StructuredSt...

2018-01-18 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/20319

[SPARK-22884][ML][TESTS] ML test for StructuredStreaming: 
spark.ml.clustering

## What changes were proposed in this pull request?

Converting clustering tests to also check code with structured streaming, 
using the ML testing infrastructure implemented in SPARK-22882.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-22884

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20319


commit 97c96b6ac7f65762cf9f125965e8d2a3cba72f60
Author: Sandor Murakozi 
Date:   2018-01-18T19:03:33Z

Converted all clustering tests to check streaming

commit b6e06e8e280f97560a342e287072f0b49e85bb79
Author: Sandor Murakozi 
Date:   2018-01-18T20:12:46Z

formatting, nits




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-01-17 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
@gatorsmile @hvanhovell @jiangxb1987, could you have a look, please?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-01-17 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r162052224
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -29,15 +29,14 @@ import org.apache.spark.ml.feature.{Instance, 
LabeledPoint}
 import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Matrix, 
SparseMatrix, Vector, Vectors}
 import org.apache.spark.ml.optim.aggregator.LogisticAggregator
 import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, 
MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
--- End diff --

nit: unused import, could be removed, just like SparkFunSuite a couple 
lines above



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-01-17 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r162053757
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
 ---
@@ -75,11 +71,9 @@ class MultilayerPerceptronClassifierSuite
   .setMaxIter(100)
   .setSolver("l-bfgs")
 val model = trainer.fit(dataset)
-val result = model.transform(dataset)
 MLTestingUtils.checkCopyAndUids(trainer, model)
-val predictionAndLabels = result.select("prediction", 
"label").collect()
-predictionAndLabels.foreach { case Row(p: Double, l: Double) =>
-  assert(p == l)
+testTransformer[(Vector, Double)](dataset.toDF(), model, "prediction", 
"label") {
--- End diff --

`dataSet` is always a `dataFrame` in this suite. If it was declared as such 
there would be no need to always call `toDF()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-01-17 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r162037613
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala
 ---
@@ -169,59 +171,28 @@ class GBTClassifierSuite extends SparkFunSuite with 
MLlibTestSparkContext
 val blas = BLAS.getInstance()
 
 val validationDataset = validationData.toDF(labelCol, featuresCol)
-val results = gbtModel.transform(validationDataset)
-// check that raw prediction is tree predictions dot tree weights
-results.select(rawPredictionCol, featuresCol).collect().foreach {
-  case Row(raw: Vector, features: Vector) =>
+testTransformer[(Double, Vector)](validationDataset, gbtModel,
+  "rawPrediction", "features", "probability", "prediction") {
+  case Row(raw: Vector, features: Vector, prob: Vector, pred: Double) 
=>
 assert(raw.size === 2)
+// check that raw prediction is tree predictions dot tree weights
 val treePredictions = 
gbtModel.trees.map(_.rootNode.predictImpl(features).prediction)
 val prediction = blas.ddot(gbtModel.numTrees, treePredictions, 1, 
gbtModel.treeWeights, 1)
 assert(raw ~== Vectors.dense(-prediction, prediction) relTol eps)
-}
 
-// Compare rawPrediction with probability
-results.select(rawPredictionCol, probabilityCol).collect().foreach {
-  case Row(raw: Vector, prob: Vector) =>
-assert(raw.size === 2)
+// Compare rawPrediction with probability
 assert(prob.size === 2)
 // Note: we should check other loss types for classification if 
they are added
 val predFromRaw = raw.toDense.values.map(value => 
LogLoss.computeProbability(value))
 assert(prob(0) ~== predFromRaw(0) relTol eps)
 assert(prob(1) ~== predFromRaw(1) relTol eps)
 assert(prob(0) + prob(1) ~== 1.0 absTol absEps)
-}
 
-// Compare prediction with probability
-results.select(predictionCol, probabilityCol).collect().foreach {
-  case Row(pred: Double, prob: Vector) =>
+// Compare prediction with probability
 val predFromProb = prob.toArray.zipWithIndex.maxBy(_._1)._2
 assert(pred == predFromProb)
 }
 
-// force it to use raw2prediction
-gbtModel.setRawPredictionCol(rawPredictionCol).setProbabilityCol("")
-val resultsUsingRaw2Predict =
-  
gbtModel.transform(validationDataset).select(predictionCol).as[Double].collect()
-
resultsUsingRaw2Predict.zip(results.select(predictionCol).as[Double].collect()).foreach
 {
-  case (pred1, pred2) => assert(pred1 === pred2)
-}
-
-// force it to use probability2prediction
-gbtModel.setRawPredictionCol("").setProbabilityCol(probabilityCol)
-val resultsUsingProb2Predict =
-  
gbtModel.transform(validationDataset).select(predictionCol).as[Double].collect()
-
resultsUsingProb2Predict.zip(results.select(predictionCol).as[Double].collect()).foreach
 {
-  case (pred1, pred2) => assert(pred1 === pred2)
-}
-
-// force it to use predict
--- End diff --

Why were these transformations and checks removed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-01-17 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r162058792
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala ---
@@ -17,14 +17,13 @@
 
 package org.apache.spark.ml.classification
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.attribute.NominalAttribute
 import org.apache.spark.ml.classification.LogisticRegressionSuite._
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.feature.StringIndexer
-import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, 
MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, 
MLTest, MLTestingUtils}
--- End diff --

nit:  import org.apache.spark.mllib.util.MLlibTestSparkContext
is unused at line 32


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-01-17 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r162051926
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
 ---
@@ -17,22 +17,18 @@
 
 package org.apache.spark.ml.classification
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.classification.LogisticRegressionSuite._
 import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.ml.util.MLTestingUtils
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, 
MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
 import org.apache.spark.mllib.evaluation.MulticlassMetrics
 import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.functions._
--- End diff --

nit: unused import, could be removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20286: [SPARK-23119][SS] Minor fixes to V2 streaming APIs

2018-01-17 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20286
  
I think this change is OK, except the nits zsxwing already noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...

2018-01-16 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20243#discussion_r161763793
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+class ConsoleWriter(batchId: Long, schema: StructType, options: 
Map[String, String])
+extends DataSourceV2Writer with Logging {
+  // Number of rows to display, by default 20 rows
+  private val numRowsToShow = 
options.get("numRows").map(_.toInt).getOrElse(20)
+
+  // Truncate the displayed data if it is too long, by default it is true
+  private val isTruncated = 
options.get("truncate").map(_.toBoolean).getOrElse(true)
--- End diff --

Same simplification possibility here, if DataSoureV2Options is used 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...

2018-01-16 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20243#discussion_r161760765
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
--- End diff --

I think this one can be deleted too: 

https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala#L453


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...

2018-01-16 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20243#discussion_r161763865
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class ConsoleWriterSuite extends StreamTest {
+  import testImplicits._
+
+  test("console") {
+val input = MemoryStream[Int]
+
+val captured = new ByteArrayOutputStream()
+Console.withOut(captured) {
+  val query = input.toDF().writeStream.format("console").start()
+  try {
+input.addData(1, 2, 3)
+query.processAllAvailable()
+input.addData(4, 5, 6)
+query.processAllAvailable()
+input.addData()
+query.processAllAvailable()
+  } finally {
+query.stop()
+  }
+}
+
+assert(captured.toString() ==
+  """---
+|Batch: 0
+|---
+|+-+
+||value|
+|+-+
+||1|
+||2|
+||3|
+|+-+
+|
+|---
+|Batch: 1
+|---
+|+-+
+||value|
+|+-+
+||4|
+||5|
+||6|
+|+-+
+|
+|---
+|Batch: 2
+|---
+|+-+
+||value|
+|+-+
+|+-+
+|
+|""".stripMargin)
+  }
+}
--- End diff --

We could have a test to check numrows, something like this:
```
  test("console with numRows") {
val input = MemoryStream[Int]

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
  val query = 
input.toDF().writeStream.format("console").option("NUMROWS", 2).start()
  try {
input.addData(1, 2, 3)
query.processAllAvailable()
  } finally {
query.stop()
  }
}

assert(captured.toString() ==
  """---
|Batch: 0
|---
|+-+
||value|
|+-+
||1|
||2|
|+-+
|only showing top 2 rows
|
|""".stripMargin)
  }

  test("console with truncation") {
val input = MemoryStream[String]

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
  val query = 
input.toDF().writeStream.format("console").option("TRUNCATE", true).start()
  try {
input.addData("123456789012345678901234567890")
query.processAllAvailable()
  } finally {
query.stop()
  }
}

assert(captured.toString() ==
  """---
|Batch: 0
|---
|++
||   value|
|++
||12345678901234567...|
|++
|
|""".stripMargin)
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20243: [SPARK-23052][SS] Migrate ConsoleSink to data sou...

2018-01-16 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20243#discussion_r161749201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, 
DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+class ConsoleWriter(batchId: Long, schema: StructType, options: 
Map[String, String])
+extends DataSourceV2Writer with Logging {
+  // Number of rows to display, by default 20 rows
+  private val numRowsToShow = 
options.get("numRows").map(_.toInt).getOrElse(20)
--- End diff --

ConsoleRelation creates this map from a DataSoureV2Options, it contains 
lowercased keys. 
Using DataSoureV2Options or asking for "numrows" would both work, but with 
DataSoureV2Options 
`options.get("numRows").map(_.toInt).getOrElse(20)` 
could also be simplified to 
`options.getInt("numRows", 20)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...

2018-01-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20251#discussion_r161293689
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -429,20 +429,40 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""),
-  basePath, plainText = false)
+
+val lastStageAttempt = {
+  val stageAttempts = jobData.stageIds.flatMap(store.stageData(_))
--- End diff --

Do we want to have data of the last stage or the last stageAttempt? 
In the second case we should load only the last stage, indeed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...

2018-01-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20251#discussion_r161276636
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -429,20 +429,40 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""),
-  basePath, plainText = false)
+
+val lastStage = {
+  val stageAttempts = jobData.stageIds.flatMap(store.stageData(_))
+  if (!stageAttempts.isEmpty) {
+val lastId = stageAttempts.map(_.stageId).max
+stageAttempts.find(_.stageId == lastId)
+  } else {
+None
+  }}
+
+val jobDescription = jobData.description
+  .getOrElse(lastStage.flatMap(_.description).
+  getOrElse(jobData.name))
+
+val lastStageName = lastStage.map(_.name).getOrElse(jobData.name)
+
+val lastStageDescription = lastStage.flatMap(_.description)
+  .getOrElse(jobData.description
+  .getOrElse(jobData.name))
--- End diff --

No need to break, but for me, it shows the structure and logic of the code 
better this way. 
I've formatted the calculation of the two descriptions symmetric and I'm 
also happy to remove unnecessary line breaks if you recommend it.

Thanks for the review and your comments @srowen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...

2018-01-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20251#discussion_r161273733
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -429,20 +429,40 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""),
-  basePath, plainText = false)
+
+val lastStage = {
+  val stageAttempts = jobData.stageIds.flatMap(store.stageData(_))
--- End diff --

stageData has this signature:
`def stageData(stageId: Int, details: Boolean = false)`
without (_) scalac doesn't use the default value of details and it 
complains that it can't use `(Int, Boolean)=>...` on a Seq[Int].


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...

2018-01-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20251#discussion_r161273851
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---
@@ -429,20 +429,40 @@ private[ui] class JobDataSource(
 val formattedDuration = duration.map(d => 
UIUtils.formatDuration(d)).getOrElse("Unknown")
 val submissionTime = jobData.submissionTime
 val formattedSubmissionTime = 
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
-val jobDescription = 
UIUtils.makeDescription(jobData.description.getOrElse(""),
-  basePath, plainText = false)
+
+val lastStage = {
+  val stageAttempts = jobData.stageIds.flatMap(store.stageData(_))
+  if (!stageAttempts.isEmpty) {
+val lastId = stageAttempts.map(_.stageId).max
+stageAttempts.find(_.stageId == lastId)
--- End diff --

Indeed, nice catch, thx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20251: [Spark-23051][core] Fix for broken job description in Sp...

2018-01-12 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20251
  
Screenshots:
[In 
2.2](https://issues.apache.org/jira/secure/attachment/12905889/in-2.2.png)
[In 2.3 before 
fix](https://issues.apache.org/jira/secure/attachment/12905887/Spark-23051-before.png)
[In 2.3 after the 
fix](https://issues.apache.org/jira/secure/attachment/12905888/Spark-23051-after.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20251: [Spark-23051][core] Fix for broken job descriptio...

2018-01-12 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/20251

[Spark-23051][core] Fix for broken job description in Spark UI

## What changes were proposed in this pull request?

In 2.2, Spark UI displayed the stage description if the job description was 
not set. This functionality was broken, the GUI has shown no description in 
this case. In addition, the code uses jobName and 
jobDescription instead of stageName and stageDescription when 
JobTableRowData is created.

In this PR the logic producing values for the job rows was modified to find 
the latest stage attempt for the job and use that as a fallback if job 
description was missing. 
StageName and stageDescription are also set using values from stage and 
jobName/description is used only as a fallback.

## How was this patch tested?
Manual testing of the UI, using the code in the bug report.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-23051

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20251


commit 0412c4b959b42cb0b41c669593e689430da59ec2
Author: Sandor Murakozi 
Date:   2018-01-12T15:10:24Z

Fix for SPARK-23051

commit 937e0fd579a7f133f860eb6f572f50ad7d7fdf89
Author: Sandor Murakozi 
Date:   2018-01-12T15:25:44Z

fixed logic determining jobDescription

commit df41000d99eef3b609ece4b072c5aee0650c21f1
Author: Sandor Murakozi 
Date:   2018-01-12T15:51:06Z

Formatted code




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161009520
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -110,7 +131,13 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
   .set(config.MAX_TASK_ATTEMPTS_PER_NODE, 3)
   .set(config.MAX_FAILURES_PER_EXEC_STAGE, 2)
   .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
-val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
+val clock = new ManualClock
+
+val attemptId = 0
+val taskSetBlacklist = new TaskSetBlacklist(
+  listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, 
clock = clock)
+
+clock.setTime(0)
--- End diff --

You should set the time to a new value before each call of 
taskSetBlacklist.updateBlacklistForFailedTask to see that the events on the 
listenerbus has the correct time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161001141
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -211,6 +211,11 @@ private[spark] class AppStatusListener(
 updateBlackListStatus(event.executorId, true)
   }
 
+  override def onExecutorBlacklistedForStage(
+event: SparkListenerExecutorBlacklistedForStage): Unit = {
+updateBlackListStatusForStage(event.executorId, event.stageId, 
event.stageAttemptId)
+  }
+
--- End diff --

Consider covering this functionality (updating the status) in 
AppStatusListenerSuite. We already have a check for blacklisting an executor, 
we should have the same for a stage.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161009908
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala ---
@@ -157,13 +187,19 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
 // lead to any node blacklisting
 val conf = new SparkConf().setAppName("test").setMaster("local")
   .set(config.BLACKLIST_ENABLED.key, "true")
-val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
+val clock = new ManualClock
+
+val attemptId = 0
+val taskSetBlacklist = new TaskSetBlacklist(
+  listenerBusMock, conf, stageId = 0, stageAttemptId = attemptId, 
clock = clock)
+clock.setTime(0)
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "1", index = 0, failureReason = "testing")
 taskSetBlacklist.updateBlacklistForFailedTask(
   "hostA", exec = "1", index = 1, failureReason = "testing")
 assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
 assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
+
verify(listenerBusMock).post(SparkListenerExecutorBlacklistedForStage(0, "1", 
2, 0, attemptId))
 
 taskSetBlacklist.updateBlacklistForFailedTask(
--- End diff --

Set time to new value before this call.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20203: [SPARK-22577] [core] executor page blacklist stat...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20203#discussion_r161005667
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -223,6 +228,15 @@ private[spark] class AppStatusListener(
 updateNodeBlackList(event.hostId, false)
   }
 
+  def updateBlackListStatusForStage(executorId: String, stageId: Int, 
stageAttemptId: Int): Unit = {
+Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
+  val now = System.nanoTime()
+  val esummary = stage.executorSummary(executorId)
+  esummary.isBlacklisted = true
+  maybeUpdate(esummary, now)
+}
+  }
+
--- End diff --

LiveEntities periodically write an immutable view of the entity to the 
store. LiveExecutor and LiveExecutorStageSummary were modified in this PR to 
maintain blacklisted status and to write it to the state store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20235#discussion_r160980529
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala 
---
@@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defaul
   }
 
   test("FPGrowth fit and transform with different data types") {
-Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach 
{ dt =>
-  val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt)))
-  val model = new FPGrowth().setMinSupport(0.5).fit(data)
-  val generatedRules = model.setMinConfidence(0.5).associationRules
-  val expectedRules = spark.createDataFrame(Seq(
-(Array("2"), Array("1"), 1.0),
-(Array("1"), Array("2"), 0.75)
-  )).toDF("antecedent", "consequent", "confidence")
-.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
-.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
-  assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
-generatedRules.sort("antecedent").rdd.collect()))
-
-  val transformed = model.transform(data)
-  val expectedTransformed = spark.createDataFrame(Seq(
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "3"), Array(2))
-  )).toDF("id", "items", "prediction")
-.withColumn("items", col("items").cast(ArrayType(dt)))
-.withColumn("prediction", col("prediction").cast(ArrayType(dt)))
-  assert(expectedTransformed.collect().toSet.equals(
-transformed.collect().toSet))
+  class DataTypeWithEncoder[A](val a: DataType)
+  (implicit val encoder: Encoder[(Int, 
Array[A], Array[A])])
--- End diff --

Done, thx.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...

2018-01-11 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20235#discussion_r160969767
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala 
---
@@ -34,86 +35,122 @@ class FPGrowthSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defaul
   }
 
   test("FPGrowth fit and transform with different data types") {
-Array(IntegerType, StringType, ShortType, LongType, ByteType).foreach 
{ dt =>
-  val data = dataset.withColumn("items", 
col("items").cast(ArrayType(dt)))
-  val model = new FPGrowth().setMinSupport(0.5).fit(data)
-  val generatedRules = model.setMinConfidence(0.5).associationRules
-  val expectedRules = spark.createDataFrame(Seq(
-(Array("2"), Array("1"), 1.0),
-(Array("1"), Array("2"), 0.75)
-  )).toDF("antecedent", "consequent", "confidence")
-.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
-.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
-  assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
-generatedRules.sort("antecedent").rdd.collect()))
-
-  val transformed = model.transform(data)
-  val expectedTransformed = spark.createDataFrame(Seq(
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "2"), Array.emptyIntArray),
-(0, Array("1", "3"), Array(2))
-  )).toDF("id", "items", "prediction")
-.withColumn("items", col("items").cast(ArrayType(dt)))
-.withColumn("prediction", col("prediction").cast(ArrayType(dt)))
-  assert(expectedTransformed.collect().toSet.equals(
-transformed.collect().toSet))
+  class DataTypeWithEncoder[A](val a: DataType)
+  (implicit val encoder: Encoder[(Int, 
Array[A], Array[A])])
--- End diff --

This class is needed for two purposes:
1. to connect data types with their corresponding DataType. 
Note: this information is already available in  AtomicType as InternalType, 
but it's not accessible. Using it from this test doesn't justify making it 
public.
2. to get the proper encoder to the testTransformer method. As the 
datatypes are put into an array dt is inferred to be their parent type, and 
implicit search is able to find the encoders only for concrete types. 
For a similar reason, we need to use the type of the final encoder. If we 
have only the encoder for A implicit search will not be able to construct 
Array[A], as we have implicit encoders for Array[Int], Array[Short]... but not 
for generic A, having an encoder.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20235: [Spark-22887][ML][TESTS][WIP] ML test for Structu...

2018-01-11 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/20235

[Spark-22887][ML][TESTS][WIP] ML test for StructuredStreaming: spark.ml.fpm

## What changes were proposed in this pull request?

Converting FPGrowth tests to also check code with structured streaming, 
using the  ML testing infrastructure implemented in SPARK-22882. 

Note: this is a WIP, test with Array[Byte] is not yet working due to some 
datatype issues (Array[Byte] vs Binary).

## How was this patch tested?
N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-22887

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20235.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20235


commit 331129556003bcf6e4bab6559e80e46ac0858706
Author: Sandor Murakozi 
Date:   2018-01-05T12:41:53Z

test 'FPGrowthModel setMinConfidence should affect rules generation and 
transform' is converted to use testTransformer

commit 93aff2c999eee4a88f7f4a3c32d6c7b601a918ac
Author: Sandor Murakozi 
Date:   2018-01-08T13:14:38Z

Test 'FPGrowth fit and transform with different data types' works with 
streaming, except for Byte

commit 8b0b00070a21bd47537a7c3ad580e2af38a481bd
Author: Sandor Murakozi 
Date:   2018-01-11T11:28:46Z

All tests use testTransformer.
Test with Array[Byte] is missing.

commit af61845ab6acfa82c4411bce3ab4a20afebd0aa3
Author: Sandor Murakozi 
Date:   2018-01-11T11:49:27Z

Unintentional changes in 93aff2c999 are reverted




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20045: [Spark-22360][SQL][TEST] Add unit tests for Window Speci...

2018-01-03 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/20045
  
@gatorsmile @hvanhovell @jiangxb1987, could you have a look, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20046: Adding unit tests for missing aggregate functions

2017-12-21 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20046#discussion_r158326073
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 ---
@@ -154,6 +154,217 @@ class DataFrameWindowFunctionsSuite extends QueryTest 
with SharedSQLContext {
 Row(2.0d), Row(2.0d)))
   }
 
+  test("corr, covar_pop, stddev_pop functions in specific window") {
+val df = Seq(
+  ("a", "p1", 10.0, 20.0),
+  ("b", "p1", 20.0, 10.0),
+  ("c", "p2", 20.0, 20.0),
+  ("d", "p2", 20.0, 20.0),
+  ("e", "p3", 0.0, 0.0),
+  ("f", "p3", 6.0, 12.0),
+  ("g", "p3", 6.0, 12.0),
+  ("h", "p3", 8.0, 16.0),
+  ("i", "p4", 5.0, 5.0)).toDF("key", "partitionId", "value1", "value2")
+checkAnswer(
+  df.select(
+$"key",
+corr("value1", "value2").over(Window.partitionBy("partitionId")
+  .orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+covar_pop("value1", "value2")
+  .over(Window.partitionBy("partitionId")
+.orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+var_pop("value1")
+  .over(Window.partitionBy("partitionId")
+.orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+stddev_pop("value1")
+  .over(Window.partitionBy("partitionId")
+.orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+var_pop("value2")
+  .over(Window.partitionBy("partitionId")
+.orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+stddev_pop("value2")
+  .over(Window.partitionBy("partitionId")
+.orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing))),
+
+  // As stddev_pop(expr) = sqrt(var_pop(expr))
+  // the "stddev_pop" column can be calculated from the "var_pop" 
column.
+  //
+  // As corr(expr1, expr2) = covar_pop(expr1, expr2) / 
(stddev_pop(expr1) * stddev_pop(expr2))
+  // the "corr" column can be calculated from the "covar_pop" and the 
two "stddev_pop" columns.
+  Seq(
+Row("a", -1.0, -25.0, 25.0, 5.0, 25.0, 5.0),
+Row("b", -1.0, -25.0, 25.0, 5.0, 25.0, 5.0),
+Row("c", null, 0.0, 0.0, 0.0, 0.0, 0.0),
+Row("d", null, 0.0, 0.0, 0.0, 0.0, 0.0),
+Row("e", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0),
+Row("f", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0),
+Row("g", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0),
+Row("h", 1.0, 18.0, 9.0, 3.0, 36.0, 6.0),
+Row("i", Double.NaN, 0.0, 0.0, 0.0, 0.0, 0.0)))
+  }
+
+  test("covar_samp, var_samp (variance), stddev_samp (stddev) functions in 
specific window") {
+val df = Seq(
+  ("a", "p1", 10.0, 20.0),
+  ("b", "p1", 20.0, 10.0),
+  ("c", "p2", 20.0, 20.0),
+  ("d", "p2", 20.0, 20.0),
+  ("e", "p3", 0.0, 0.0),
+  ("f", "p3", 6.0, 12.0),
+  ("g", "p3", 6.0, 12.0),
+  ("h", "p3", 8.0, 16.0),
+  ("i", "p4", 5.0, 5.0)).toDF("key", "partitionId", "value1", "value2")
+checkAnswer(
+  df.select(
+$"key",
+covar_samp("value1", 
"value2").over(Window.partitionBy("partitionId")
+  .orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+var_samp("value1").over(Window.partitionBy("partitionId")
+  .orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+variance("value1").over(Window.partitionBy("partitionId")
+  .orderBy("key").rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)),
+stddev_sam

[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...

2017-12-21 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20045#discussion_r158304105
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 ---
@@ -32,6 +32,217 @@ import org.apache.spark.unsafe.types.CalendarInterval
 class DataFrameWindowFunctionsSuite extends QueryTest with 
SharedSQLContext {
   import testImplicits._
 
+  test("Window partitionBy cardinality, no order by") {
+val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 4)).toDF("key", 
"value")
+
+checkAnswer(
+  df.select(
+sum("value").over(),
+sum("value").over(Window.partitionBy("key")),
+sum("value").over(Window.partitionBy("key", "value")),
+sum("value").over(Window.partitionBy("value", "key"))),
+  Row(11, 3, 1, 1) :: Row(11, 3, 2, 2) :: Row(11, 8, 8, 8) :: Row(11, 
8, 8, 8) :: Nil)
+  }
+
+  test("Null value in partition key") {
+val df = Seq(("a", 1), ("a", 2), (null, 4), (null, 8)).toDF("key", 
"value")
+
+checkAnswer(
+  df.select(
+'value,
+sum("value").over(Window.partitionBy("key"))),
+  Row(1, 3) :: Row(2, 3) :: Row(4, 12) :: Row(8, 12) :: Nil)
+  }
+
+  test("Same partitionBy multiple times") {
+val df = Seq(("a", 1), ("a", 2), ("b", 4), ("b", 8)).toDF("key", 
"value")
+
+checkAnswer(
+  df.select(
+sum("value").over(Window.partitionBy("key", "key"))),
+  Row(3) :: Row(3) :: Row(12) :: Row(12) :: Nil)
+  }
+
+  test("Multiple orderBy clauses") {
+val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 
4)).toDF("k1", "k2", "v")
+
+checkAnswer(
+  df.select(
+'v,
+lead("v", 1).over(Window.orderBy("k1", "k2")),
+lead("v", 1).over(Window.orderBy("k2", "k1"))),
+  Row(1, 2, 4) :: Row(4, 3, 2) :: Row(2, 4, 3) :: Row(3, null, null) 
:: Nil)
+  }
+
+  test("Multiple orderBy clauses with desc") {
+val df = Seq(("a", "x", 1), ("a", "y", 2), ("b", "y", 3), ("b", "x", 
4)).toDF("k1", "k2", "v")
+
+checkAnswer(
+  df.select(
+'v,
+lead("v", 1).over(Window.orderBy($"k1".desc, $"k2")),
+lead("v", 1).over(Window.orderBy($"k1", $"k2".desc)),
+lead("v", 1).over(Window.orderBy($"k1".desc, $"k2"))),
--- End diff --

It should be desc, desc, thanks for catching it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...

2017-12-21 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20045#discussion_r158304013
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
 ---
@@ -246,13 +246,23 @@ class ExpressionParserSuite extends PlanTest {
 // Basic window testing.
 assertEqual("foo(*) over w1", UnresolvedWindowExpression(func, 
WindowSpecReference("w1")))
 assertEqual("foo(*) over ()", windowed())
+assertEqual("foo(*) over (partition by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b)))
+assertEqual("foo(*) over (distribute by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b)))
+assertEqual("foo(*) over (cluster by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b)))
-assertEqual("foo(*) over (order by a desc, b asc)", 
windowed(Seq.empty, Seq('a.desc, 'b.asc )))
-assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, 
Seq('a.desc, 'b.asc )))
+assertEqual("foo(*) over (order by a)", windowed(Seq.empty, 
Seq('a.asc)))
+assertEqual("foo(*) over (order by a desc, b asc)", 
windowed(Seq.empty, Seq('a.desc, 'b.asc)))
+assertEqual("foo(*) over (sort by a)", windowed(Seq.empty, 
Seq('a.asc)))
+assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, 
Seq('a.desc, 'b.asc)))
 assertEqual("foo(*) over (partition by a, b order by c)", 
windowed(Seq('a, 'b), Seq('c.asc)))
 assertEqual("foo(*) over (distribute by a, b sort by c)", 
windowed(Seq('a, 'b), Seq('c.asc)))
--- End diff --

I will fix it, thx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...

2017-12-21 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20045#discussion_r158303978
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
 ---
@@ -246,13 +246,23 @@ class ExpressionParserSuite extends PlanTest {
 // Basic window testing.
 assertEqual("foo(*) over w1", UnresolvedWindowExpression(func, 
WindowSpecReference("w1")))
 assertEqual("foo(*) over ()", windowed())
+assertEqual("foo(*) over (partition by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (partition by a, b)", windowed(Seq('a, 'b)))
+assertEqual("foo(*) over (distribute by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (distribute by a, b)", windowed(Seq('a, 'b)))
+assertEqual("foo(*) over (cluster by a)", windowed(Seq('a)))
 assertEqual("foo(*) over (cluster by a, b)", windowed(Seq('a, 'b)))
-assertEqual("foo(*) over (order by a desc, b asc)", 
windowed(Seq.empty, Seq('a.desc, 'b.asc )))
-assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, 
Seq('a.desc, 'b.asc )))
+assertEqual("foo(*) over (order by a)", windowed(Seq.empty, 
Seq('a.asc)))
+assertEqual("foo(*) over (order by a desc, b asc)", 
windowed(Seq.empty, Seq('a.desc, 'b.asc)))
+assertEqual("foo(*) over (sort by a)", windowed(Seq.empty, 
Seq('a.asc)))
+assertEqual("foo(*) over (sort by a desc, b asc)", windowed(Seq.empty, 
Seq('a.desc, 'b.asc)))
 assertEqual("foo(*) over (partition by a, b order by c)", 
windowed(Seq('a, 'b), Seq('c.asc)))
--- End diff --

I will fix it, thx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20045: [Spark-22360][SQL] Add unit tests for Window Spec...

2017-12-21 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/20045

[Spark-22360][SQL] Add unit tests for Window Specifications

## What changes were proposed in this pull request?

Improve the test coverage of window specifications.

New tests cover basic cases more systematically in 
DataFrameWindowFunctionsSuite:
- different partition clauses (none, one, multiple)
- different order clauses (none, one, multiple, asc/desc, nulls first/last)
- defaults if clauses are missing

New tests were added to cover some more complex cases when partitionBy or 
orderBy uses expressions.

ExpressionParserSuite.'window function expressions' was also extended to 
check parsing of some additional window expressions.

## How was this patch tested?
Only new tests were added, automated tests were executed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-22360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20045.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20045


commit 313aafbebc134ec02110a31fb99bb72c30d61639
Author: Sandor Murakozi 
Date:   2017-12-21T11:23:12Z

New test cases for window specification

commit 6c48095358aa77d3dd12c7dbba97ecc31c22604e
Author: Sandor Murakozi 
Date:   2017-12-21T13:05:02Z

test for filtering based on window function

commit e941aca540c68e74c80d279150c8d8731f2ed332
Author: Sandor Murakozi 
Date:   2017-12-21T13:47:11Z

More test expressions in ExpressionParserSuite.window function expressions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-13 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156609625
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: 
String) // No need for isV
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Specialized version of `Param[String]` for Java.
+ */
+@DeveloperApi
+class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean)
+  extends Param[String](parent, name, doc, isValid) {
+
+  private var options: Option[Array[String]] = None
+
+  def this(parent: Params, name: String, doc: String) =
+this(parent, name, doc, ParamValidators.alwaysTrue)
+
+  /** construct a StringParam with limited options (case-insensitive) */
+  def this(parent: Params, name: String, doc: String, options: 
Array[String]) = {
+this(parent, name, doc + s" Supported options (case-insensitive): 
${options.mkString(", ")}.",
+  s => options.exists(s.equalsIgnoreCase))
+this.options = Some(options)
+  }
+
+  private[spark] def getOptions: Option[Array[String]] = options
+
+  /** Creates a param pair with given value (for Java). */
+  override def w(value: String): ParamPair[String] = super.w(value)
+
+  override def validate(value: String): Unit = {
--- End diff --

should be private[param] 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-13 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156609525
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: 
String) // No need for isV
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Specialized version of `Param[String]` for Java.
+ */
+@DeveloperApi
+class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean)
+  extends Param[String](parent, name, doc, isValid) {
+
+  private var options: Option[Array[String]] = None
+
+  def this(parent: Params, name: String, doc: String) =
+this(parent, name, doc, ParamValidators.alwaysTrue)
+
+  /** construct a StringParam with limited options (case-insensitive) */
+  def this(parent: Params, name: String, doc: String, options: 
Array[String]) = {
+this(parent, name, doc + s" Supported options (case-insensitive): 
${options.mkString(", ")}.",
--- End diff --

I missed one additional line when highlighting, sorry for that :)
I meant to test the case-insensitive validation in the next line.

Doc could be tested too, I think it's rather a nice to have than a must.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-13 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156605360
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: 
String) // No need for isV
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Specialized version of `Param[String]` for Java.
+ */
+@DeveloperApi
+class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean)
+  extends Param[String](parent, name, doc, isValid) {
+
+  private var options: Option[Array[String]] = None
--- End diff --

What about this?
```
class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean, 
  options: Option[Array[String]] = None)
  extends Param[String](parent, name, doc, isValid) {
...
  def this(parent: Params, name: String, doc: String, options: 
Array[String]) = {
this(parent, name, doc + s" Supported options (case-insensitive): 
${options.mkString(", ")}.",
  s => options.exists(s.equalsIgnoreCase), Some(options))
  }```

This solves the options as val problem, but highlights another one: 
why do we need the possibility to give an explicit isValid? Why not always 
expect options only?

I agree with @attilapiros that these params are enum-like. If so the only 
reasonable validation is to check if one of the acceptable values are given 
(ignoring case). I don't remember ever seeing a custom validator doing anything 
else. Removing these custom validators would decrease complexity and code 
duplication.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156339663
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: 
String) // No need for isV
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Specialized version of `Param[String]` for Java.
+ */
+@DeveloperApi
+class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean)
+  extends Param[String](parent, name, doc, isValid) {
+
+  private var options: Option[Array[String]] = None
+
+  def this(parent: Params, name: String, doc: String) =
+this(parent, name, doc, ParamValidators.alwaysTrue)
+
+  /** construct a StringParam with limited options (case-insensitive) */
+  def this(parent: Params, name: String, doc: String, options: 
Array[String]) = {
+this(parent, name, doc + s" Supported options (case-insensitive): 
${options.mkString(", ")}.",
--- End diff --

Is this tested somewhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r155782239
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala 
---
@@ -139,6 +139,17 @@ class ParamsSuite extends SparkFunSuite {
   }
 }
 
+{ // StringArrayParam
--- End diff --

It should be StringParam


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156086625
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -435,6 +435,43 @@ class BooleanParam(parent: String, name: String, doc: 
String) // No need for isV
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Specialized version of `Param[String]` for Java.
+ */
+@DeveloperApi
+class StringParam(parent: Params, name: String, doc: String, isValid: 
String => Boolean)
+  extends Param[String](parent, name, doc, isValid) {
+
+  private var options: Option[Array[String]] = None
--- End diff --

It should rather be a val. That way you would not need def getOptions()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19599: [SPARK-22381] [ML] Add StringParam that supports ...

2017-12-12 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19599#discussion_r156339373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -224,8 +222,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") 
override val uid: String
   elasticNetParam, fitIntercept, maxIter, regParam, standardization, 
aggregationDepth)
 instr.logNumFeatures(numFeatures)
 
-if (($(solver) == Auto &&
-  numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) 
== Normal) {
+if (($(solver).equalsIgnoreCase(Auto) && numFeatures <= 
WeightedLeastSquares.MAX_NUM_FEATURES)
--- End diff --

I think using equalsIgnoreCase all over the places is problematic: you need 
to check and potentially modify code at many places in the code. Everywhere 
when $() is used on a StringParam you may need this change.  
It's pretty easy to miss some and that would lead to subtle bugs.

A safer approach would be to work with lowercase param values everywhere.
You could convert to lowercase in the constructor and require that possible 
param values are given as lowercase. This way all comparisons, pattern matches, 
etc. would work just fine. 
The downside of this approach would be that everywhere where you show these 
values to the users you would present the lowercase value. It might cause 
issues e.g. if you log it and some external log parser expects the original 
(non-lowercase) version. In the very broad sense, it would break compatibility 
but I guess that would be acceptable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19920: [SPARK-21672][CORE] Remove SHS-specific application / at...

2017-12-09 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/19920
  
Thanks for your help @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155852268
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala ---
@@ -88,4 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("")
   private def makePageLink(showIncomplete: Boolean): String = {
 UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete)
   }
+
+  private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {
--- End diff --

I've checked the status and the history packages and found no suitable 
object. 
As it is only one extra copy of a tiny method and it's in test code I don't 
think it would worth the extra complexity of introducing a new object, so I 
think it's best to leave it as it is. 
If we use it more frequently we can extract into a util object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-08 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155725705
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -24,27 +24,29 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 
-class ApplicationInfo private[spark](
-val id: String,
-val name: String,
-val coresGranted: Option[Int],
-val maxCores: Option[Int],
-val coresPerExecutor: Option[Int],
-val memoryPerExecutorMB: Option[Int],
-val attempts: Seq[ApplicationAttemptInfo])
+case class ApplicationInfo private[spark](
+id: String,
+name: String,
+coresGranted: Option[Int],
+maxCores: Option[Int],
+coresPerExecutor: Option[Int],
+memoryPerExecutorMB: Option[Int],
+attempts: Seq[ApplicationAttemptInfo]) {
+
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155718836
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala ---
@@ -88,4 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("")
   private def makePageLink(showIncomplete: Boolean): String = {
 UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete)
   }
+
+  private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {
--- End diff --

My first commit had this method in ApplicationInfo, called completed

@vanzin:
`+case class ApplicationInfo private[spark](
...
+def completed: Boolean = {`
Is this used anywhere?

I answered we have it in HistoryPage (forgot to mention usage in the test)

@vanzin: Can we move this logic there instead? That avoids adding a new 
visible property in a public object.

I moved it, noting the code duplication in a comment on PR. 
The question is which is better: a bit of duplication (used 3 and defined 2 
times, once in application and once in test code) or extending the public API? 
My gut feeling is that the small duplication is preferable in this case, 
but I'm a newbie in Spark development so I'm easy to convince that we have 
different preferences :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155657473
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -17,16 +17,15 @@
 
 package org.apache.spark.status
 
-import java.io.File
-import java.util.{Arrays, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.scope._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.{Distribution}
--- End diff --

restored original


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155657274
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala ---
@@ -30,7 +31,8 @@ private[history] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("")
 val requestedIncomplete =
   
Option(UIUtils.stripXSS(request.getParameter("showIncomplete"))).getOrElse("false").toBoolean
 
-val allAppsSize = parent.getApplicationList().count(_.completed != 
requestedIncomplete)
+val allAppsSize = parent.getApplicationList().
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155657306
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala ---
@@ -88,4 +90,9 @@ private[history] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("")
   private def makePageLink(showIncomplete: Boolean): String = {
 UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete)
   }
+
+  private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {
+appInfo.attempts.nonEmpty && appInfo.attempts.head.completed
+
--- End diff --

nuked


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19906: [SPARK-22516][SQL] Bump up Univocity version to 2.5.9

2017-12-07 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/19906
  
Thanks for your help and reviews @HyukjinKwon, @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155632345
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -24,27 +24,32 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 
-class ApplicationInfo private[spark](
-val id: String,
-val name: String,
-val coresGranted: Option[Int],
-val maxCores: Option[Int],
-val coresPerExecutor: Option[Int],
-val memoryPerExecutorMB: Option[Int],
-val attempts: Seq[ApplicationAttemptInfo])
+case class ApplicationInfo private[spark](
+id: String,
+name: String,
+coresGranted: Option[Int],
+maxCores: Option[Int],
+coresPerExecutor: Option[Int],
+memoryPerExecutorMB: Option[Int],
+attempts: Seq[ApplicationAttemptInfo]) {
+
+def completed: Boolean = {
--- End diff --

Moved, also had to add it to `HistoryServerSuite`.
A bit of code duplication, but indeed better than increasing API surface 
area.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155631788
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.status.api.v1
+import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, 
ApplicationInfo}
--- End diff --

fixed
in addition removed some unused imports


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155631687
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -29,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, 
ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo}
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155631377
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -188,11 +188,11 @@ class HistoryServer(
   }
 
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
-
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+getApplicationList()
   }
 
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
-
provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+ provider.getApplicationInfo(appId)
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155611738
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -24,27 +24,32 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 
-class ApplicationInfo private[spark](
-val id: String,
-val name: String,
-val coresGranted: Option[Int],
-val maxCores: Option[Int],
-val coresPerExecutor: Option[Int],
-val memoryPerExecutorMB: Option[Int],
-val attempts: Seq[ApplicationAttemptInfo])
+case class ApplicationInfo private[spark](
--- End diff --

We check equality of ApplicationInfos during testing. I think converting 
them to case classes introduce less complexity than adding a proper equals 
method (ditto for ApplicationAttemptInfo below). 
In addition we copy ApplicationInfos in FsHistoryProvider:819, it's also 
simpler with case classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155610889
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -24,27 +24,32 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 
-class ApplicationInfo private[spark](
-val id: String,
-val name: String,
-val coresGranted: Option[Int],
-val maxCores: Option[Int],
-val coresPerExecutor: Option[Int],
-val memoryPerExecutorMB: Option[Int],
-val attempts: Seq[ApplicationAttemptInfo])
+case class ApplicationInfo private[spark](
+id: String,
+name: String,
+coresGranted: Option[Int],
+maxCores: Option[Int],
+coresPerExecutor: Option[Int],
+memoryPerExecutorMB: Option[Int],
+attempts: Seq[ApplicationAttemptInfo]) {
+
+def completed: Boolean = {
--- End diff --

org.apache.spark.deploy.history.HistoryPage:33


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
Github user smurakozi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19920#discussion_r155609528
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -29,6 +29,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, 
ApplicationEnvironmentInfo, ApplicationInfo, RuntimeInfo}
--- End diff --

There is no ambiguity between ApplicationAttemptInfo and ApplicationAttempt 
classes in different versions anymore, so I thought the v1-s can be removed. I 
can undo that if they are still useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19920: [SPARK-21672][CORE] Remove SHS-specific application / at...

2017-12-07 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/19920
  
PR description was cleaned, thanks for catching it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19920: [SPARK-21672][CORE] Remove SHS-specific applicati...

2017-12-07 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/19920

[SPARK-21672][CORE] Remove SHS-specific application / attempt data …

…structures

## What changes were proposed in this pull request?

In general, the SHS pages now use the public API types to represent 
applications. Some internal code paths still used its own view of what 
applications and attempts look like (`ApplicationHistoryInfo` and 
`ApplicationAttemptInfo`), declared in ApplicationHistoryProvider.scala.

This pull request removes these classes and updates the rest of the code to 
use `status.api.v1.ApplicationInfo` and `status.api.v1.ApplicationAttemptInfo` 
instead.

Furthermore `status.api.v1.ApplicationInfo` and 
`status.api.v1.ApplicationAttemptInfo` were changed to case class to 
- facilitate copying instances
- equality checking in test code
- nicer toString() 

To simplify the code a bit `v1.` prefixes were also removed from 
occurrences of v1.ApplicationInfo and v1.ApplicationAttemptInfo as there is no 
more ambiguity between classes in history and status.api.v1.

## How was this patch tested?

By running existing automated tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-21672

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19920.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19920


commit a457acf3faeba94bb1473d2496b8a7126e4f8b1d
Author: Sandor Murakozi 
Date:   2017-12-06T16:34:28Z

[SPARK-21672][CORE] Remove SHS-specific application / attempt data 
structures




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19906: [SPARK-22516][SQL] Bump up Univocity version to 2...

2017-12-06 Thread smurakozi
GitHub user smurakozi opened a pull request:

https://github.com/apache/spark/pull/19906

[SPARK-22516][SQL] Bump up Univocity version to 2.5.9

## What changes were proposed in this pull request?

There was a bug in Univocity Parser that causes the issue in SPARK-22516. 
This was fixed by upgrading from 2.5.4 to 2.5.9 version of the library :

**Executing**
```
spark.read.option("header","true").option("inferSchema", 
"true").option("multiLine", "true").option("comment", 
"g").csv("test_file_without_eof_char.csv").show()
```
**Before**
```
ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 6)
com.univocity.parsers.common.TextParsingException: 
java.lang.IllegalArgumentException - Unable to skip 1 lines from line 2. End of 
input reached
...
Internal state when error was thrown: line=3, column=0, record=2, 
charIndex=31
at 
com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:339)
at 
com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:475)
at 
org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:281)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
**After**
```
+---+---+
|column1|column2|
+---+---+
|abc|def|
+---+---+
```

## How was this patch tested?
The already existing `CSVSuite.commented lines in CSV data` test was 
extended to parse the file also in multiline mode. The test input file was 
modified to also include a comment in the last line.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smurakozi/spark SPARK-22516

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19906


commit 8bc6a9ce9f6eeb854261d26dabaf04052eb8b5b2
Author: smurakozi 
Date:   2017-11-27T08:30:25Z

[SPARK-22516][SQL] Bump up Univocity version to 2.5.9




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...

2017-12-05 Thread smurakozi
Github user smurakozi commented on the issue:

https://github.com/apache/spark/pull/19893
  
Logging the leaked threads in a more grep friendly format would be nice, 
you could easily create a thread leak report.
It would be also nice to see the leaks on the console. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >