[jira] [Created] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-15 Thread Michel Davit (JIRA)
Michel Davit created SPARK-23986:


 Summary: CompileException when using too many avg aggregation 
after joining
 Key: SPARK-23986
 URL: https://issues.apache.org/jira/browse/SPARK-23986
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Michel Davit


Considering the following code:
{code:java}
val df1: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
  .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")

val df2: DataFrame = sparkSession.sparkContext
  .makeRDD(Seq((0, "val1", "val2")))
  .toDF("key", "dummy1", "dummy2")

val agg = df1
  .join(df2, df1("key") === df2("key"), "leftouter")
  .groupBy(df1("key"))
  .agg(
avg("col2").as("avg2"),
avg("col3").as("avg3"),
avg("col4").as("avg4"),
avg("col1").as("avg1"),
avg("col5").as("avg5"),
avg("col6").as("avg6")
  )

val head = agg.take(1)
{code}
This logs the following exception:
{code:java}
ERROR CodeGenerator: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
467, Column 28: Redefinition of parameter "agg_expr_11"
{code}
I am not a spark expert but after investigation, I realized that the generated 
{{doConsume}} method is responsible of the exception.

Indeed, {{avg}} calls several times 
{{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
The 1st time with the 'avg' Expr and a second time for the base aggregation 
Expr (count and sum).

The problem comes from the generation of parameters in CodeGenerator:
{code:java}
  /**
   * Returns a term name that is unique within this instance of a 
`CodegenContext`.
   */
  def freshName(name: String): String = synchronized {
val fullName = if (freshNamePrefix == "") {
  name
} else {
  s"${freshNamePrefix}_$name"
}
if (freshNameIds.contains(fullName)) {
  val id = freshNameIds(fullName)
  freshNameIds(fullName) = id + 1
  s"$fullName$id"
} else {
  freshNameIds += fullName -> 1
  fullName
}
  }
{code}
The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
 The second call is made with {{agg_expr_[1..12]}} and generates the following 
names:
 {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name conflicts 
in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.

Appending the 'id' in s"$fullName$id" to generate unique term name is source of 
conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23985) predicate push down doesn't work with simple compound partition spec

2018-04-15 Thread Ohad Raviv (JIRA)
Ohad Raviv created SPARK-23985:
--

 Summary: predicate push down doesn't work with simple compound 
partition spec
 Key: SPARK-23985
 URL: https://issues.apache.org/jira/browse/SPARK-23985
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Ohad Raviv


while predicate push down works with this query: 
{code:sql}
select *, row_number() over (partition by a order by b) from t1 where a>1
{code}
it dowsn't work with:
{code:sql}
select *, row_number() over (partition by concat(a,'lit') order by b) from t1 
where a>1
{code}
 
I added a test to FilterPushdownSuite which I think recreates the problem:
{code:scala}
  test("Window: predicate push down -- ohad") {
val winExpr = windowExpr(count('b),
  windowSpec(Concat('a :: Nil) :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

val originalQuery = testRelation.select('a, 'b, 'c, 
winExpr.as('window)).where('a > 1)
val correctAnswer = testRelation
  .where('a > 1).select('a, 'b, 'c)
  .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
  .select('a, 'b, 'c, 'window).analyze

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
  }
{code}

will try to create a PR with a correction




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-15 Thread Michel Davit (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michel Davit updated SPARK-23986:
-
Priority: Major  (was: Minor)

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23959) UnresolvedException with DataSet created from Seq.empty since Spark 2.3.0

2018-04-15 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23959.
--
Resolution: Cannot Reproduce

I can't reproduce in the master too. I am resolving this. It'd be nicer if the 
JIRA fixing this is identified and backported if applicable.

> UnresolvedException with DataSet created from Seq.empty since Spark 2.3.0
> -
>
> Key: SPARK-23959
> URL: https://issues.apache.org/jira/browse/SPARK-23959
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Sam De Backer
>Priority: Major
>
> The following snippet works fine in Spark 2.2.1 but gives a rather cryptic 
> runtime exception in Spark 2.3.0:
> {code:java}
> import sparkSession.implicits._
> import org.apache.spark.sql.functions._
> case class X(xid: Long, yid: Int)
> case class Y(yid: Int, zid: Long)
> case class Z(zid: Long, b: Boolean)
> val xs = Seq(X(1L, 10)).toDS()
> val ys = Seq(Y(10, 100L)).toDS()
> val zs = Seq.empty[Z].toDS()
> val j = xs
>   .join(ys, "yid")
>   .join(zs, Seq("zid"), "left")
>   .withColumn("BAM", when('b, "B").otherwise("NB"))
> j.show(){code}
> In Spark 2.2.1 it prints to the console
> {noformat}
> +---+---+---++---+
> |zid|yid|xid|   b|BAM|
> +---+---+---++---+
> |100| 10|  1|null| NB|
> +---+---+---++---+{noformat}
> In Spark 2.3.0 it results in:
> {noformat}
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: 'BAM
> at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
> at 
> org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
> at 
> org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
> at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
> ...{noformat}
> The culprit really seems to be DataSet being created from an empty Seq[Z]. 
> When you change that to something that will also result in an empty 
> DataSet[Z] it works as in Spark 2.2.1, e.g.
> {code:java}
> val zs = Seq(Z(10L, true)).toDS().filter('zid < Long.MinValue){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21811) Inconsistency when finding the widest common type of a combination of DateType, StringType, and NumericType

2018-04-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438737#comment-16438737
 ] 

Apache Spark commented on SPARK-21811:
--

User 'jiangxb1987' has created a pull request for this issue:
https://github.com/apache/spark/pull/21074

> Inconsistency when finding the widest common type of a combination of 
> DateType, StringType, and NumericType
> ---
>
> Key: SPARK-21811
> URL: https://issues.apache.org/jira/browse/SPARK-21811
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Bald
>Priority: Minor
>
> Finding the widest common type for the arguments of a variadic function (such 
> as IN or COALESCE) when the types of the arguments are a combination of 
> DateType/TimestampType, StringType, and NumericType fails with an 
> AnalysisException for some orders of the arguments and succeeds with a common 
> type of StringType for other orders of the arguments.
> The below examples used to reproduce the error assume a schema of:
> {{[c1: date, c2: string, c3: int]}}
> The following succeeds:
> {{SELECT coalesce(c1, c2, c3) FROM table}}
> While the following produces an exception:
> {{SELECT coalesce(c1, c3, c2) FROM table}}
> The order of arguments affects the behavior because it looks to be the widest 
> common type is found by repeatedly looking at two arguments at a time, the 
> widest common type found thus far and the next argument. On initial thought 
> of a fix, I think the way the widest common type is found would have to be 
> changed and instead look at all arguments first before deciding what the 
> widest common type should be.
> As my boss is out of office for the rest of the day I will give a pull 
> request a shot, but as I am not super familiar with Scala or Spark's coding 
> style guidelines, a pull request is not promised. Going forward with my 
> attempted pull request, I will assume having DateType/TimestampType, 
> StringType, and NumericType arguments in an IN expression and COALESCE 
> function (and any other function/expression where this combination of 
> argument types can occur) is valid. I find it also quite reasonable to have 
> this combination of argument types to be invalid, so if that's what is 
> decided, then oh well.
> If I were a betting man, I'd say the fix would be made in the following file: 
> [TypeCoercion.scala|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23983) Disable X-Frame-Options from Spark UI response headers if explicitly configured

2018-04-15 Thread Taylor Cressy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438783#comment-16438783
 ] 

Taylor Cressy commented on SPARK-23983:
---

[~jpallas] That is unfortunate. Do we have a support matrix for the Spark UI 
(i.e. which versions of which browsers we want to support)? 

 

Is the option to turn off X-Frame-Options a potential approach instead?

> Disable X-Frame-Options from Spark UI response headers if explicitly 
> configured
> ---
>
> Key: SPARK-23983
> URL: https://issues.apache.org/jira/browse/SPARK-23983
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Taylor Cressy
>Priority: Minor
>  Labels: UI
>
> We should introduce a configuration for the spark UI to omit X-Frame-Options 
> from the response headers if explicitly set.
> The X-Frame-Options header was introduced in *org.apache.spark.ui.JettyUtils* 
> to prevent frame-related click-jacking vulnerabilities. This was addressed 
> in: SPARK-10589
>  
> {code:java}
> val allowFramingFrom = conf.getOption("spark.ui.allowFramingFrom")
> val xFrameOptionsValue =
>allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN")
> ...
> // In doGet
> response.setHeader("X-Frame-Options", xFrameOptionsValue)
> {code}
>  
> The problem with this, is that we only allow the same origin or a singular 
> host to present the UI with iframes. I propose we add a configuration that 
> turns this off.
>  
> Use Case: Currently building a "portal UI" for all things related to a 
> cluster. Embedding the spark UI in the portal is necessary because the 
> cluster is in the cloud and can only be accessed via an SSH tunnel - as 
> intended. (The reverse proxy configuration {{*_spark.ui.reverseProxy_* could 
> be used to simplify connecting to all the workers}}, but this doesn't solve 
> handling multiple, unrelated, UIs through a single tunnel.
>  
> Moreover, the host that our "portal UI" would reside on is not assigned a 
> hostname and has an ephemeral IP address, so the *ALLOW-FROM* directive isn't 
> useful in this case.
>  
> Lastly, the current design does not allow for different hosts to be 
> configured, i.e. *_spark.ui.allowFramingFrom_* _*hostname1,hostname2*_ is not 
> a valid config.
>  
> An alternative option would be to explore Content-Security-Policy: 
> [https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy#frame-ancestors]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22014) Sample windows in Spark SQL

2018-04-15 Thread Simon Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simon Schiff resolved SPARK-22014.
--
Resolution: Auto Closed

> Sample windows in Spark SQL
> ---
>
> Key: SPARK-22014
> URL: https://issues.apache.org/jira/browse/SPARK-22014
> Project: Spark
>  Issue Type: Wish
>  Components: DStreams, SQL
>Affects Versions: 2.2.0
>Reporter: Simon Schiff
>Priority: Minor
>
> Hello,
> I am using spark to process measurement data. It is possible to create sample 
> windows in Spark Streaming, where the duration of the window is smaller than 
> the slide. But when I try to do the same with Spark SQL (The measurement data 
> has a time stamp column) then I got an analysis exception:
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 6000, 18000, 0)' due to data type 
> mismatch: The slide duration (18000) must be less than or equal to the 
> windowDuration (6000)
> {code}
> Here is a example:
> {code:java}
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.functions;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> public class App {
>   public static Timestamp createTimestamp(String in) throws Exception {
>   SimpleDateFormat dateFormat = new SimpleDateFormat("-MM-dd 
> hh:mm:ss");
>   Date parsedDate = dateFormat.parse(in);
>   return new Timestamp(parsedDate.getTime());
>   }
>   
>   public static void main(String[] args) {
>   SparkSession spark = SparkSession.builder().appName("Window 
> Sampling Example").getOrCreate();
>   
>   List sensorData = new ArrayList();
>   sensorData.add("2017-08-04 00:00:00, 22.75");
>   sensorData.add("2017-08-04 00:01:00, 23.82");
>   sensorData.add("2017-08-04 00:02:00, 24.15");
>   sensorData.add("2017-08-04 00:03:00, 23.16");
>   sensorData.add("2017-08-04 00:04:00, 22.62");
>   sensorData.add("2017-08-04 00:05:00, 22.89");
>   sensorData.add("2017-08-04 00:06:00, 23.21");
>   sensorData.add("2017-08-04 00:07:00, 24.59");
>   sensorData.add("2017-08-04 00:08:00, 24.44");
>   
>   Dataset in = spark.createDataset(sensorData, 
> Encoders.STRING());
>   
>   StructType sensorSchema = DataTypes.createStructType(new 
> StructField[] { 
>   DataTypes.createStructField("timestamp", 
> DataTypes.TimestampType, false),
>   DataTypes.createStructField("value", 
> DataTypes.DoubleType, false),
>   });
>   
>   Dataset data = 
> spark.createDataFrame(in.toJavaRDD().map(new Function() {
>   public Row call(String line) throws Exception {
>   return 
> RowFactory.create(createTimestamp(line.split(",")[0]), 
> Double.parseDouble(line.split(",")[1]));
>   }
>   }), sensorSchema);
>   
>   data.groupBy(functions.window(data.col("timestamp"), "1 
> minutes", "3 minutes")).avg("value").orderBy("window").show(false);
>   }
> }
> {code}
> I think there should be no difference (duration and slide) in a "Spark 
> Streaming window" and a "Spark SQL window" function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-22014) Sample windows in Spark SQL

2018-04-15 Thread Simon Schiff (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simon Schiff closed SPARK-22014.


> Sample windows in Spark SQL
> ---
>
> Key: SPARK-22014
> URL: https://issues.apache.org/jira/browse/SPARK-22014
> Project: Spark
>  Issue Type: Wish
>  Components: DStreams, SQL
>Affects Versions: 2.2.0
>Reporter: Simon Schiff
>Priority: Minor
>
> Hello,
> I am using spark to process measurement data. It is possible to create sample 
> windows in Spark Streaming, where the duration of the window is smaller than 
> the slide. But when I try to do the same with Spark SQL (The measurement data 
> has a time stamp column) then I got an analysis exception:
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 6000, 18000, 0)' due to data type 
> mismatch: The slide duration (18000) must be less than or equal to the 
> windowDuration (6000)
> {code}
> Here is a example:
> {code:java}
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat;
> import java.util.ArrayList;
> import java.util.Date;
> import java.util.List;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SparkSession;
> import org.apache.spark.sql.functions;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> public class App {
>   public static Timestamp createTimestamp(String in) throws Exception {
>   SimpleDateFormat dateFormat = new SimpleDateFormat("-MM-dd 
> hh:mm:ss");
>   Date parsedDate = dateFormat.parse(in);
>   return new Timestamp(parsedDate.getTime());
>   }
>   
>   public static void main(String[] args) {
>   SparkSession spark = SparkSession.builder().appName("Window 
> Sampling Example").getOrCreate();
>   
>   List sensorData = new ArrayList();
>   sensorData.add("2017-08-04 00:00:00, 22.75");
>   sensorData.add("2017-08-04 00:01:00, 23.82");
>   sensorData.add("2017-08-04 00:02:00, 24.15");
>   sensorData.add("2017-08-04 00:03:00, 23.16");
>   sensorData.add("2017-08-04 00:04:00, 22.62");
>   sensorData.add("2017-08-04 00:05:00, 22.89");
>   sensorData.add("2017-08-04 00:06:00, 23.21");
>   sensorData.add("2017-08-04 00:07:00, 24.59");
>   sensorData.add("2017-08-04 00:08:00, 24.44");
>   
>   Dataset in = spark.createDataset(sensorData, 
> Encoders.STRING());
>   
>   StructType sensorSchema = DataTypes.createStructType(new 
> StructField[] { 
>   DataTypes.createStructField("timestamp", 
> DataTypes.TimestampType, false),
>   DataTypes.createStructField("value", 
> DataTypes.DoubleType, false),
>   });
>   
>   Dataset data = 
> spark.createDataFrame(in.toJavaRDD().map(new Function() {
>   public Row call(String line) throws Exception {
>   return 
> RowFactory.create(createTimestamp(line.split(",")[0]), 
> Double.parseDouble(line.split(",")[1]));
>   }
>   }), sensorSchema);
>   
>   data.groupBy(functions.window(data.col("timestamp"), "1 
> minutes", "3 minutes")).avg("value").orderBy("window").show(false);
>   }
> }
> {code}
> I think there should be no difference (duration and slide) in a "Spark 
> Streaming window" and a "Spark SQL window" function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-15 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438818#comment-16438818
 ] 

Kazuaki Ishizaki commented on SPARK-23986:
--

Thank for reporting an issue with deep dive.

When I run this repro with the latest master, it works well without an 
exception. When I checked the generated code, I cannot find variables 
{{agg_expr_[21|31|41|51|61]}}. 
Would it be possible to attach the log file of the generated code?

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-15 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438818#comment-16438818
 ] 

Kazuaki Ishizaki edited comment on SPARK-23986 at 4/15/18 7:36 PM:
---

Thank for reporting an issue with deep dive.

When I run this repro with the latest master, it works well without an 
exception. When I checked the generated code, I cannot find variables 
{{agg_expr_[21|31|41|51|61]}}. I will check it with branch-2.3 tomorrow.
Would it be possible to attach the log file of the generated code?


was (Author: kiszk):
Thank for reporting an issue with deep dive.

When I run this repro with the latest master, it works well without an 
exception. When I checked the generated code, I cannot find variables 
{{agg_expr_[21|31|41|51|61]}}. 
Would it be possible to attach the log file of the generated code?

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23987) Unused mailing lists

2018-04-15 Thread Sebb (JIRA)
Sebb created SPARK-23987:


 Summary: Unused mailing lists
 Key: SPARK-23987
 URL: https://issues.apache.org/jira/browse/SPARK-23987
 Project: Spark
  Issue Type: Task
  Components: Documentation
Affects Versions: 2.3.0
Reporter: Sebb


The following mailing lists were set up in Jan 2015 but have not been used, and 
don't appear to be mentioned on the website:

ml@
sql@
streaming@

If they are not needed, please file a JIRA request with INFR to close them down.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19618) Inconsistency wrt max. buckets allowed from Dataframe API vs SQL

2018-04-15 Thread Fernando Pereira (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438822#comment-16438822
 ] 

Fernando Pereira commented on SPARK-19618:
--

Is there any technological problem in using more than 100k buckets? Otherwise 
what about making it configurable?

We have an 80TB workload and to keep partitions "manageable" we do need to use 
a large number of buckets. While it might seem a lot today it is expected that 
workloads will continue to increase in size...

> Inconsistency wrt max. buckets allowed from Dataframe API vs SQL
> 
>
> Key: SPARK-19618
> URL: https://issues.apache.org/jira/browse/SPARK-19618
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Major
> Fix For: 2.2.0
>
>
> High number of buckets is allowed while creating a table via SQL query:
> {code}
> sparkSession.sql("""
> CREATE TABLE bucketed_table(col1 INT) USING parquet 
> CLUSTERED BY (col1) SORTED BY (col1) INTO 147483647 BUCKETS
> """)
> sparkSession.sql("DESC FORMATTED bucketed_table").collect.foreach(println)
> 
> [Num Buckets:,147483647,]
> [Bucket Columns:,[col1],]
> [Sort Columns:,[col1],]
> 
> {code}
> Trying the same via dataframe API does not work:
> {code}
> > df.write.format("orc").bucketBy(147483647, 
> > "j","k").sortBy("j","k").saveAsTable("bucketed_table")
> java.lang.IllegalArgumentException: requirement failed: Bucket number must be 
> greater than 0 and less than 10.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:293)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:291)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.sql.DataFrameWriter.getBucketSpec(DataFrameWriter.scala:291)
>   at 
> org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:429)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:410)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:365)
>   ... 50 elided
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23988) [Mesos] Improve handling of appResource in mesos dispatcher when using Docker

2018-04-15 Thread paul mackles (JIRA)
paul mackles created SPARK-23988:


 Summary: [Mesos] Improve handling of appResource in mesos 
dispatcher when using Docker
 Key: SPARK-23988
 URL: https://issues.apache.org/jira/browse/SPARK-23988
 Project: Spark
  Issue Type: Improvement
  Components: Mesos
Affects Versions: 2.3.0, 2.2.1
Reporter: paul mackles


Our organization makes heavy use of Docker containers when running Spark on 
Mesos. The images we use for our containers include Spark along with all of the 
application dependencies. We find this to be a great way to manage our 
artifacts.

When specifying the primary application jar (i.e. appResource), the mesos 
dispatcher insists on adding it to the list of URIs for Mesos to fetch as part 
of launching the driver's container. This leads to confusing behavior where 
paths such as:
 * file:///application.jar
 * local:/application.jar
 * /application.jar

wind up being fetched from the host where the driver is running. Obviously, 
this doesn't work since all of the above examples are referencing the path of 
the jar on the container image itself.

Here is an example that I used for testing:
{code:java}
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://spark-dispatcher \
  --deploy-mode cluster \
  --conf spark.cores.max=4 \
  --conf spark.mesos.executor.docker.image=spark:2.2.1 \
  local:/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar 10{code}
The "spark:2.2.1" image contains an installation of spark under 
"/usr/local/spark". Notice how we reference the appResource using the "local:/" 
scheme.

If you try the above with the current version of the mesos dispatcher, it will 
try to fetch the path 
"/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar" from the host 
filesystem where the driver's container is running. On our systems, this fails 
since we don't have spark installed on the hosts. 

For the PR, all I did was modify the mesos dispatcher to not add the 
"appResource to the list of URIs for Mesos to fetch if it uses the "local:/" 
scheme.

For now, I didn't change the behavior of absolute paths or the "file:/" scheme 
because I wanted to leave some form for the old behavior in place for backwards 
compatibility. Anyone have any opinions on whether these schemes should change 
as well?

The PR also includes support for using "spark-internal" with Mesos in cluster 
mode which is something we need for another use-case. I can separate them if 
that makes more sense.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23988) [Mesos] Improve handling of appResource in mesos dispatcher when using Docker

2018-04-15 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438875#comment-16438875
 ] 

Apache Spark commented on SPARK-23988:
--

User 'pmackles' has created a pull request for this issue:
https://github.com/apache/spark/pull/21075

> [Mesos] Improve handling of appResource in mesos dispatcher when using Docker
> -
>
> Key: SPARK-23988
> URL: https://issues.apache.org/jira/browse/SPARK-23988
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.3.0
>Reporter: paul mackles
>Priority: Minor
>
> Our organization makes heavy use of Docker containers when running Spark on 
> Mesos. The images we use for our containers include Spark along with all of 
> the application dependencies. We find this to be a great way to manage our 
> artifacts.
> When specifying the primary application jar (i.e. appResource), the mesos 
> dispatcher insists on adding it to the list of URIs for Mesos to fetch as 
> part of launching the driver's container. This leads to confusing behavior 
> where paths such as:
>  * file:///application.jar
>  * local:/application.jar
>  * /application.jar
> wind up being fetched from the host where the driver is running. Obviously, 
> this doesn't work since all of the above examples are referencing the path of 
> the jar on the container image itself.
> Here is an example that I used for testing:
> {code:java}
> spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master mesos://spark-dispatcher \
>   --deploy-mode cluster \
>   --conf spark.cores.max=4 \
>   --conf spark.mesos.executor.docker.image=spark:2.2.1 \
>   local:/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar 10{code}
> The "spark:2.2.1" image contains an installation of spark under 
> "/usr/local/spark". Notice how we reference the appResource using the 
> "local:/" scheme.
> If you try the above with the current version of the mesos dispatcher, it 
> will try to fetch the path 
> "/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar" from the host 
> filesystem where the driver's container is running. On our systems, this 
> fails since we don't have spark installed on the hosts. 
> For the PR, all I did was modify the mesos dispatcher to not add the 
> "appResource to the list of URIs for Mesos to fetch if it uses the "local:/" 
> scheme.
> For now, I didn't change the behavior of absolute paths or the "file:/" 
> scheme because I wanted to leave some form for the old behavior in place for 
> backwards compatibility. Anyone have any opinions on whether these schemes 
> should change as well?
> The PR also includes support for using "spark-internal" with Mesos in cluster 
> mode which is something we need for another use-case. I can separate them if 
> that makes more sense.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23988) [Mesos] Improve handling of appResource in mesos dispatcher when using Docker

2018-04-15 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23988:


Assignee: Apache Spark

> [Mesos] Improve handling of appResource in mesos dispatcher when using Docker
> -
>
> Key: SPARK-23988
> URL: https://issues.apache.org/jira/browse/SPARK-23988
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.3.0
>Reporter: paul mackles
>Assignee: Apache Spark
>Priority: Minor
>
> Our organization makes heavy use of Docker containers when running Spark on 
> Mesos. The images we use for our containers include Spark along with all of 
> the application dependencies. We find this to be a great way to manage our 
> artifacts.
> When specifying the primary application jar (i.e. appResource), the mesos 
> dispatcher insists on adding it to the list of URIs for Mesos to fetch as 
> part of launching the driver's container. This leads to confusing behavior 
> where paths such as:
>  * file:///application.jar
>  * local:/application.jar
>  * /application.jar
> wind up being fetched from the host where the driver is running. Obviously, 
> this doesn't work since all of the above examples are referencing the path of 
> the jar on the container image itself.
> Here is an example that I used for testing:
> {code:java}
> spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master mesos://spark-dispatcher \
>   --deploy-mode cluster \
>   --conf spark.cores.max=4 \
>   --conf spark.mesos.executor.docker.image=spark:2.2.1 \
>   local:/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar 10{code}
> The "spark:2.2.1" image contains an installation of spark under 
> "/usr/local/spark". Notice how we reference the appResource using the 
> "local:/" scheme.
> If you try the above with the current version of the mesos dispatcher, it 
> will try to fetch the path 
> "/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar" from the host 
> filesystem where the driver's container is running. On our systems, this 
> fails since we don't have spark installed on the hosts. 
> For the PR, all I did was modify the mesos dispatcher to not add the 
> "appResource to the list of URIs for Mesos to fetch if it uses the "local:/" 
> scheme.
> For now, I didn't change the behavior of absolute paths or the "file:/" 
> scheme because I wanted to leave some form for the old behavior in place for 
> backwards compatibility. Anyone have any opinions on whether these schemes 
> should change as well?
> The PR also includes support for using "spark-internal" with Mesos in cluster 
> mode which is something we need for another use-case. I can separate them if 
> that makes more sense.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23988) [Mesos] Improve handling of appResource in mesos dispatcher when using Docker

2018-04-15 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23988:


Assignee: (was: Apache Spark)

> [Mesos] Improve handling of appResource in mesos dispatcher when using Docker
> -
>
> Key: SPARK-23988
> URL: https://issues.apache.org/jira/browse/SPARK-23988
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.3.0
>Reporter: paul mackles
>Priority: Minor
>
> Our organization makes heavy use of Docker containers when running Spark on 
> Mesos. The images we use for our containers include Spark along with all of 
> the application dependencies. We find this to be a great way to manage our 
> artifacts.
> When specifying the primary application jar (i.e. appResource), the mesos 
> dispatcher insists on adding it to the list of URIs for Mesos to fetch as 
> part of launching the driver's container. This leads to confusing behavior 
> where paths such as:
>  * file:///application.jar
>  * local:/application.jar
>  * /application.jar
> wind up being fetched from the host where the driver is running. Obviously, 
> this doesn't work since all of the above examples are referencing the path of 
> the jar on the container image itself.
> Here is an example that I used for testing:
> {code:java}
> spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master mesos://spark-dispatcher \
>   --deploy-mode cluster \
>   --conf spark.cores.max=4 \
>   --conf spark.mesos.executor.docker.image=spark:2.2.1 \
>   local:/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar 10{code}
> The "spark:2.2.1" image contains an installation of spark under 
> "/usr/local/spark". Notice how we reference the appResource using the 
> "local:/" scheme.
> If you try the above with the current version of the mesos dispatcher, it 
> will try to fetch the path 
> "/usr/local/spark/examples/jars/spark-examples_2.11-2.2.1.jar" from the host 
> filesystem where the driver's container is running. On our systems, this 
> fails since we don't have spark installed on the hosts. 
> For the PR, all I did was modify the mesos dispatcher to not add the 
> "appResource to the list of URIs for Mesos to fetch if it uses the "local:/" 
> scheme.
> For now, I didn't change the behavior of absolute paths or the "file:/" 
> scheme because I wanted to leave some form for the old behavior in place for 
> backwards compatibility. Anyone have any opinions on whether these schemes 
> should change as well?
> The PR also includes support for using "spark-internal" with Mesos in cluster 
> mode which is something we need for another use-case. I can separate them if 
> that makes more sense.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-15 Thread Edwina Lu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438879#comment-16438879
 ] 

Edwina Lu commented on SPARK-23206:
---

After discussion with [~irashid] on the PR, we've decided to move 
ExecutorMetricsUpdate logging to stage end, to minimize the amount of extra 
logging. The updated design doc: 
[https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing]

[^SPARK-23206 Design Doc.pdf]

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-15 Thread Edwina Lu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edwina Lu updated SPARK-23206:
--
Attachment: SPARK-23206 Design Doc.pdf

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23983) Disable X-Frame-Options from Spark UI response headers if explicitly configured

2018-04-15 Thread Joe Pallas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438880#comment-16438880
 ] 

Joe Pallas commented on SPARK-23983:


I'm just a bystander.  I found a couple of bug reports that mention the big 
four (Firefox, IE, Chrome, Safari), but nothing that seems definitive.

I can certainly imagine that a reserved, otherwise illegal value for the 
{{allowFramingFrom}} option (such as {{any}} or {{*}}) might reasonably be 
defined to mean "don't send the {{X-Frame-Options}} header".  That would avoid 
adding any new configuration options, at the cost of, well, being a bit of a 
hack. But the option is already kind of questionable.  I don't know how the 
browsers that don't support {{ALLOW-FROM}} react when they see it.

> Disable X-Frame-Options from Spark UI response headers if explicitly 
> configured
> ---
>
> Key: SPARK-23983
> URL: https://issues.apache.org/jira/browse/SPARK-23983
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.3.0
>Reporter: Taylor Cressy
>Priority: Minor
>  Labels: UI
>
> We should introduce a configuration for the spark UI to omit X-Frame-Options 
> from the response headers if explicitly set.
> The X-Frame-Options header was introduced in *org.apache.spark.ui.JettyUtils* 
> to prevent frame-related click-jacking vulnerabilities. This was addressed 
> in: SPARK-10589
>  
> {code:java}
> val allowFramingFrom = conf.getOption("spark.ui.allowFramingFrom")
> val xFrameOptionsValue =
>allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN")
> ...
> // In doGet
> response.setHeader("X-Frame-Options", xFrameOptionsValue)
> {code}
>  
> The problem with this, is that we only allow the same origin or a singular 
> host to present the UI with iframes. I propose we add a configuration that 
> turns this off.
>  
> Use Case: Currently building a "portal UI" for all things related to a 
> cluster. Embedding the spark UI in the portal is necessary because the 
> cluster is in the cloud and can only be accessed via an SSH tunnel - as 
> intended. (The reverse proxy configuration {{*_spark.ui.reverseProxy_* could 
> be used to simplify connecting to all the workers}}, but this doesn't solve 
> handling multiple, unrelated, UIs through a single tunnel.
>  
> Moreover, the host that our "portal UI" would reside on is not assigned a 
> hostname and has an ephemeral IP address, so the *ALLOW-FROM* directive isn't 
> useful in this case.
>  
> Lastly, the current design does not allow for different hosts to be 
> configured, i.e. *_spark.ui.allowFramingFrom_* _*hostname1,hostname2*_ is not 
> a valid config.
>  
> An alternative option would be to explore Content-Security-Policy: 
> [https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy#frame-ancestors]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23986) CompileException when using too many avg aggregation after joining

2018-04-15 Thread Kazuaki Ishizaki (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438885#comment-16438885
 ] 

Kazuaki Ishizaki commented on SPARK-23986:
--

While I also checked it with branch-2.3, it works well without any exception.

> CompileException when using too many avg aggregation after joining
> --
>
> Key: SPARK-23986
> URL: https://issues.apache.org/jira/browse/SPARK-23986
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michel Davit
>Priority: Major
>
> Considering the following code:
> {code:java}
> val df1: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
>   .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
> val df2: DataFrame = sparkSession.sparkContext
>   .makeRDD(Seq((0, "val1", "val2")))
>   .toDF("key", "dummy1", "dummy2")
> val agg = df1
>   .join(df2, df1("key") === df2("key"), "leftouter")
>   .groupBy(df1("key"))
>   .agg(
> avg("col2").as("avg2"),
> avg("col3").as("avg3"),
> avg("col4").as("avg4"),
> avg("col1").as("avg1"),
> avg("col5").as("avg5"),
> avg("col6").as("avg6")
>   )
> val head = agg.take(1)
> {code}
> This logs the following exception:
> {code:java}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 467, Column 28: Redefinition of parameter "agg_expr_11"
> {code}
> I am not a spark expert but after investigation, I realized that the 
> generated {{doConsume}} method is responsible of the exception.
> Indeed, {{avg}} calls several times 
> {{org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction}}. 
> The 1st time with the 'avg' Expr and a second time for the base aggregation 
> Expr (count and sum).
> The problem comes from the generation of parameters in CodeGenerator:
> {code:java}
>   /**
>* Returns a term name that is unique within this instance of a 
> `CodegenContext`.
>*/
>   def freshName(name: String): String = synchronized {
> val fullName = if (freshNamePrefix == "") {
>   name
> } else {
>   s"${freshNamePrefix}_$name"
> }
> if (freshNameIds.contains(fullName)) {
>   val id = freshNameIds(fullName)
>   freshNameIds(fullName) = id + 1
>   s"$fullName$id"
> } else {
>   freshNameIds += fullName -> 1
>   fullName
> }
>   }
> {code}
> The {{freshNameIds}} already contains {{agg_expr_[1..6]}} from the 1st call.
>  The second call is made with {{agg_expr_[1..12]}} and generates the 
> following names:
>  {{agg_expr_[11|21|31|41|51|61|11|12}}. We then have 2 parameter name 
> conflicts in the generated code: {{agg_expr_11}} and {{agg_expr_12}}.
> Appending the 'id' in s"$fullName$id" to generate unique term name is source 
> of conflict. Maybe simply using undersoce can solve this issue : 
> $fullName_$id"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23982) NoSuchMethodException: There is no startCredentialUpdater method in the object YarnSparkHadoopUtil

2018-04-15 Thread John (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438895#comment-16438895
 ] 

John commented on SPARK-23982:
--

see spark-core_2.11-2.3.0 and spark-yarn_2.11-2.3.0

org.apache.spark.executor.CoarseGrainedExecutorBackend:

if (driverConf.contains("spark.yarn.credentials.file")) {
  logInfo("Will periodically update credentials from: " +
    driverConf.get("spark.yarn.credentials.file"))
  Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
    .getMethod("startCredentialUpdater", classOf[SparkConf])
    .invoke(null, driverConf)
}

 

> NoSuchMethodException: There is no startCredentialUpdater method in the 
> object YarnSparkHadoopUtil
> --
>
> Key: SPARK-23982
> URL: https://issues.apache.org/jira/browse/SPARK-23982
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: John
>Priority: Major
>
>  In the 219 line of the CoarseGrainedExecutorBackend class:
> Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").getMethod("startCredentialUpdater",
>  classOf[SparkConf]).invoke(null, driverConf)
> But, There is no startCredentialUpdater method in the object 
> YarnSparkHadoopUtil.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19618) Inconsistency wrt max. buckets allowed from Dataframe API vs SQL

2018-04-15 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438912#comment-16438912
 ] 

Wenchen Fan commented on SPARK-19618:
-

making it configurable sounds like a good idea, can you open a JIRA for it? 
thanks!

> Inconsistency wrt max. buckets allowed from Dataframe API vs SQL
> 
>
> Key: SPARK-19618
> URL: https://issues.apache.org/jira/browse/SPARK-19618
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Major
> Fix For: 2.2.0
>
>
> High number of buckets is allowed while creating a table via SQL query:
> {code}
> sparkSession.sql("""
> CREATE TABLE bucketed_table(col1 INT) USING parquet 
> CLUSTERED BY (col1) SORTED BY (col1) INTO 147483647 BUCKETS
> """)
> sparkSession.sql("DESC FORMATTED bucketed_table").collect.foreach(println)
> 
> [Num Buckets:,147483647,]
> [Bucket Columns:,[col1],]
> [Sort Columns:,[col1],]
> 
> {code}
> Trying the same via dataframe API does not work:
> {code}
> > df.write.format("orc").bucketBy(147483647, 
> > "j","k").sortBy("j","k").saveAsTable("bucketed_table")
> java.lang.IllegalArgumentException: requirement failed: Bucket number must be 
> greater than 0 and less than 10.
>   at scala.Predef$.require(Predef.scala:224)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:293)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$getBucketSpec$2.apply(DataFrameWriter.scala:291)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.spark.sql.DataFrameWriter.getBucketSpec(DataFrameWriter.scala:291)
>   at 
> org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:429)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:410)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:365)
>   ... 50 elided
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-15 Thread Edwina Lu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edwina Lu updated SPARK-23206:
--
Attachment: SPARK-23206 Design Doc.pdf

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23929) pandas_udf schema mapped by position and not by name

2018-04-15 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438935#comment-16438935
 ] 

Li Jin commented on SPARK-23929:


I agree with [~hyukjin.kwon]. Seems like there is not a strong enough reason to 
change the behavior.

> pandas_udf schema mapped by position and not by name
> 
>
> Key: SPARK-23929
> URL: https://issues.apache.org/jira/browse/SPARK-23929
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: PySpark
> Spark 2.3.0
>  
>Reporter: Omri
>Priority: Major
>
> The return struct of a pandas_udf should be mapped to the provided schema by 
> name. Currently it's not the case.
> Consider these two examples, where the only change is the order of the fields 
> in the provided schema struct:
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> df = spark.createDataFrame(
>     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
>     ("id", "v"))  
> @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP)  
> def normalize(pdf):
>     v = pdf.v
>     return pdf.assign(v=(v - v.mean()) / v.std())
> df.groupby("id").apply(normalize).show() 
> {code}
> and this one:
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> df = spark.createDataFrame(
>     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
>     ("id", "v"))  
> @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP)  
> def normalize(pdf):
>     v = pdf.v
>     return pdf.assign(v=(v - v.mean()) / v.std())
> df.groupby("id").apply(normalize).show()
> {code}
> The results should be the same but they are different:
> For the first code:
> {code:java}
> +---+---+
> |  v| id|
> +---+---+
> |1.0|  0|
> |1.0|  0|
> |2.0|  0|
> |2.0|  0|
> |2.0|  1|
> +---+---+
> {code}
> For the second code:
> {code:java}
> +---+---+
> | id|  v|
> +---+---+
> |  1|-0.7071067811865475|
> |  1| 0.7071067811865475|
> |  2|-0.8320502943378437|
> |  2|-0.2773500981126146|
> |  2| 1.1094003924504583|
> +---+---+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-15 Thread Edwina Lu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edwina Lu updated SPARK-23206:
--
Attachment: (was: SPARK-23206 Design Doc.pdf)

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23929) pandas_udf schema mapped by position and not by name

2018-04-15 Thread Li Jin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438935#comment-16438935
 ] 

Li Jin edited comment on SPARK-23929 at 4/16/18 2:40 AM:
-

I agree with [~hyukjin.kwon]. Seems like there is not a strong enough reason to 
change the behavior...


was (Author: icexelloss):
I agree with [~hyukjin.kwon]. Seems like there is not a strong enough reason to 
change the behavior.

> pandas_udf schema mapped by position and not by name
> 
>
> Key: SPARK-23929
> URL: https://issues.apache.org/jira/browse/SPARK-23929
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: PySpark
> Spark 2.3.0
>  
>Reporter: Omri
>Priority: Major
>
> The return struct of a pandas_udf should be mapped to the provided schema by 
> name. Currently it's not the case.
> Consider these two examples, where the only change is the order of the fields 
> in the provided schema struct:
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> df = spark.createDataFrame(
>     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
>     ("id", "v"))  
> @pandas_udf("v double,id long", PandasUDFType.GROUPED_MAP)  
> def normalize(pdf):
>     v = pdf.v
>     return pdf.assign(v=(v - v.mean()) / v.std())
> df.groupby("id").apply(normalize).show() 
> {code}
> and this one:
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> df = spark.createDataFrame(
>     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
>     ("id", "v"))  
> @pandas_udf("id long,v double", PandasUDFType.GROUPED_MAP)  
> def normalize(pdf):
>     v = pdf.v
>     return pdf.assign(v=(v - v.mean()) / v.std())
> df.groupby("id").apply(normalize).show()
> {code}
> The results should be the same but they are different:
> For the first code:
> {code:java}
> +---+---+
> |  v| id|
> +---+---+
> |1.0|  0|
> |1.0|  0|
> |2.0|  0|
> |2.0|  0|
> |2.0|  1|
> +---+---+
> {code}
> For the second code:
> {code:java}
> +---+---+
> | id|  v|
> +---+---+
> |  1|-0.7071067811865475|
> |  1| 0.7071067811865475|
> |  2|-0.8320502943378437|
> |  2|-0.2773500981126146|
> |  2| 1.1094003924504583|
> +---+---+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23956) Use effective RPC port in AM registration

2018-04-15 Thread Saisai Shao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao reassigned SPARK-23956:
---

Assignee: Gera Shegalov

> Use effective RPC port in AM registration 
> --
>
> Key: SPARK-23956
> URL: https://issues.apache.org/jira/browse/SPARK-23956
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.3.0
>Reporter: Gera Shegalov
>Assignee: Gera Shegalov
>Priority: Minor
> Fix For: 2.4.0
>
>
> AM's should use their real rpc port in the AM registration for better 
> diagnostics in Application Report.
> {code}
> 18/04/10 14:56:21 INFO Client:
> client token: N/A
> diagnostics: N/A
> ApplicationMaster host: localhost
> ApplicationMaster RPC port: 58338
> queue: default
> start time: 1523397373659
> final status: UNDEFINED
> tracking URL: http://localhost:8088/proxy/application_1523370127531_0016/
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23956) Use effective RPC port in AM registration

2018-04-15 Thread Saisai Shao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao resolved SPARK-23956.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21047
[https://github.com/apache/spark/pull/21047]

> Use effective RPC port in AM registration 
> --
>
> Key: SPARK-23956
> URL: https://issues.apache.org/jira/browse/SPARK-23956
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.3.0
>Reporter: Gera Shegalov
>Assignee: Gera Shegalov
>Priority: Minor
> Fix For: 2.4.0
>
>
> AM's should use their real rpc port in the AM registration for better 
> diagnostics in Application Report.
> {code}
> 18/04/10 14:56:21 INFO Client:
> client token: N/A
> diagnostics: N/A
> ApplicationMaster host: localhost
> ApplicationMaster RPC port: 58338
> queue: default
> start time: 1523397373659
> final status: UNDEFINED
> tracking URL: http://localhost:8088/proxy/application_1523370127531_0016/
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23917) High-order function: array_max(x) → x

2018-04-15 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-23917:
---

Assignee: Marco Gaido

> High-order function: array_max(x) → x
> -
>
> Key: SPARK-23917
> URL: https://issues.apache.org/jira/browse/SPARK-23917
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Marco Gaido
>Priority: Major
> Fix For: 2.4.0
>
>
> Returns the maximum value of input array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23917) High-order function: array_max(x) → x

2018-04-15 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-23917.
-
   Resolution: Fixed
Fix Version/s: 2.4.0

> High-order function: array_max(x) → x
> -
>
> Key: SPARK-23917
> URL: https://issues.apache.org/jira/browse/SPARK-23917
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Marco Gaido
>Priority: Major
> Fix For: 2.4.0
>
>
> Returns the maximum value of input array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23715) from_utc_timestamp returns incorrect results for some UTC date/time values

2018-04-15 Thread Wenchen Fan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438982#comment-16438982
 ] 

Wenchen Fan commented on SPARK-23715:
-

is there a standard definition of from_utc_timestamp? How shall we treat the 
input timestamp? a UTC timestamp or a local timezone timestamp?

> from_utc_timestamp returns incorrect results for some UTC date/time values
> --
>
> Key: SPARK-23715
> URL: https://issues.apache.org/jira/browse/SPARK-23715
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Bruce Robbins
>Priority: Major
>
> This produces the expected answer:
> {noformat}
> df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" 
> ).as("dt")).show
> +---+
> | dt|
> +---+
> |2018-03-13 07:18:23|
> +---+
> {noformat}
> However, the equivalent UTC input (but with an explicit timezone) produces a 
> wrong answer:
> {noformat}
> df.select(from_utc_timestamp(lit("2018-03-13T06:18:23+00:00"), "GMT+1" 
> ).as("dt")).show
> +---+
> | dt|
> +---+
> |2018-03-13 00:18:23|
> +---+
> {noformat}
> Additionally, the equivalent Unix time (1520921903, which is also 
> "2018-03-13T06:18:23" in the UTC time zone) produces the same wrong answer:
> {noformat}
> df.select(from_utc_timestamp(to_timestamp(lit(1520921903)), "GMT+1" 
> ).as("dt")).show
> +---+
> | dt|
> +---+
> |2018-03-13 00:18:23|
> +---+
> {noformat}
> These issues stem from the fact that the FromUTCTimestamp expression, despite 
> its name, expects the input to be in the user's local timezone. There is some 
> magic under the covers to make things work (mostly) as the user expects.
> As an example, let's say a user in Los Angeles issues the following:
> {noformat}
> df.select(from_utc_timestamp(lit("2018-03-13T06:18:23"), "GMT+1" 
> ).as("dt")).show
> {noformat}
> FromUTCTimestamp gets as input a Timestamp (long) value representing
> {noformat}
> 2018-03-13T06:18:23-07:00 (long value 152094710300)
> {noformat}
> What FromUTCTimestamp needs instead is
> {noformat}
> 2018-03-13T06:18:23+00:00 (long value 152092190300)
> {noformat}
> So, it applies the local timezone's offset to the input timestamp to get the 
> correct value (152094710300 minus 7 hours is 152092190300). Then it 
> can process the value and produce the expected output.
> When the user explicitly specifies a time zone, FromUTCTimestamp's 
> assumptions break down. The input is no longer in the local time zone. 
> Because of the way input data is implicitly casted, FromUTCTimestamp never 
> knows whether the input data had an explicit timezone.
> Here are some gory details:
> There is sometimes a mismatch in expectations between the (string => 
> timestamp) cast and FromUTCTimestamp. Also, since the FromUTCTimestamp 
> expression never sees the actual input string (the cast "intercepts" the 
> input and converts it to a long timestamp before FromUTCTimestamp uses the 
> value), FromUTCTimestamp cannot reject any input value that would exercise 
> this mismatch in expectations.
> There is a similar mismatch in expectations in the (integer => timestamp) 
> cast and FromUTCTimestamp. As a result, Unix time input almost always 
> produces incorrect output.
> h3. When things work as expected for String input:
> When from_utc_timestamp is passed a string time value with no time zone, 
> DateTimeUtils.stringToTimestamp (called from a Cast expression) treats the 
> datetime string as though it's in the user's local time zone. Because 
> DateTimeUtils.stringToTimestamp is a general function, this is reasonable.
> As a result, FromUTCTimestamp's input is a timestamp shifted by the local 
> time zone's offset. FromUTCTimestamp assumes this (or more accurately, a 
> utility function called by FromUTCTimestamp assumes this), so the first thing 
> it does is reverse-shift to get it back the correct value. Now that the long 
> value has been shifted back to the correct timestamp value, it can now 
> process it (by shifting it again based on the specified time zone).
> h3. When things go wrong with String input:
> When from_utc_timestamp is passed a string datetime value with an explicit 
> time zone, stringToTimestamp honors that timezone and ignores the local time 
> zone. stringToTimestamp does not shift the timestamp by the local timezone's 
> offset, but by the timezone specified on the datetime string.
> Unfortunately, FromUTCTimestamp, which has no insight into the actual input 
> or the conversion, still assumes the timestamp is shifted by the local time 
> zone. So it reverse-shifts the l

[jira] [Commented] (SPARK-23206) Additional Memory Tuning Metrics

2018-04-15 Thread Xiao Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438983#comment-16438983
 ] 

Xiao Li commented on SPARK-23206:
-

No permission to access the doc  
[https://docs.google.com/document/d/1vLojop9I4WkpUdbrSnoHzJ6jkCMnH2Ot5JTSk7YEX5s/edit?usp=sharing]

Could you double check it? Thanks!

> Additional Memory Tuning Metrics
> 
>
> Key: SPARK-23206
> URL: https://issues.apache.org/jira/browse/SPARK-23206
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Edwina Lu
>Priority: Major
> Attachments: ExecutorsTab.png, ExecutorsTab2.png, 
> MemoryTuningMetricsDesignDoc.pdf, SPARK-23206 Design Doc.pdf, StageTab.png
>
>
> At LinkedIn, we have multiple clusters, running thousands of Spark 
> applications, and these numbers are growing rapidly. We need to ensure that 
> these Spark applications are well tuned – cluster resources, including 
> memory, should be used efficiently so that the cluster can support running 
> more applications concurrently, and applications should run quickly and 
> reliably.
> Currently there is limited visibility into how much memory executors are 
> using, and users are guessing numbers for executor and driver memory sizing. 
> These estimates are often much larger than needed, leading to memory wastage. 
> Examining the metrics for one cluster for a month, the average percentage of 
> used executor memory (max JVM used memory across executors /  
> spark.executor.memory) is 35%, leading to an average of 591GB unused memory 
> per application (number of executors * (spark.executor.memory - max JVM used 
> memory)). Spark has multiple memory regions (user memory, execution memory, 
> storage memory, and overhead memory), and to understand how memory is being 
> used and fine-tune allocation between regions, it would be useful to have 
> information about how much memory is being used for the different regions.
> To improve visibility into memory usage for the driver and executors and 
> different memory regions, the following additional memory metrics can be be 
> tracked for each executor and driver:
>  * JVM used memory: the JVM heap size for the executor/driver.
>  * Execution memory: memory used for computation in shuffles, joins, sorts 
> and aggregations.
>  * Storage memory: memory used caching and propagating internal data across 
> the cluster.
>  * Unified memory: sum of execution and storage memory.
> The peak values for each memory metric can be tracked for each executor, and 
> also per stage. This information can be shown in the Spark UI and the REST 
> APIs. Information for peak JVM used memory can help with determining 
> appropriate values for spark.executor.memory and spark.driver.memory, and 
> information about the unified memory region can help with determining 
> appropriate values for spark.memory.fraction and 
> spark.memory.storageFraction. Stage memory information can help identify 
> which stages are most memory intensive, and users can look into the relevant 
> code to determine if it can be optimized.
> The memory metrics can be gathered by adding the current JVM used memory, 
> execution memory and storage memory to the heartbeat. SparkListeners are 
> modified to collect the new metrics for the executors, stages and Spark 
> history log. Only interesting values (peak values per stage per executor) are 
> recorded in the Spark history log, to minimize the amount of additional 
> logging.
> We have attached our design documentation with this ticket and would like to 
> receive feedback from the community for this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org