[jira] [Commented] (SPARK-13209) transitive closure on a dataframe

2017-01-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814239#comment-15814239
 ] 

Hyukjin Kwon commented on SPARK-13209:
--

It seems (at least at the current master) the plans are too large. If you use 
checkpoint API, it looks not going slow down. 

> transitive closure on a dataframe
> -
>
> Key: SPARK-13209
> URL: https://issues.apache.org/jira/browse/SPARK-13209
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Placek
>
> When I run the following loop the join gets slower and slower regardless of 
> caching. If I change the data frame to rdd and then back again (uncomment the 
> last commented line) it seems that there is no slow down but I get an error 
> after around 30 iterations. The code assumes that Edge is a simple case class 
> with start and end.
> {code}
> val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 
> 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 
> 17), (17, 18), (18, 19), (19, 20), (20, 21), (21, 22), (22, 23), (23, 24), 
> (24, 25), (25, 26), (26, 27), (27, 28), (23, 4), (4, 5), (5, 6), (6, 7), (7, 
> 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13))
> //val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7))
> var edges = sc
>   .parallelize(e, N)
>   .map(p => Edge(p._1, p._2))
>   .toDF()
>   .cache()
> //var edges = e.map(p => Edge(p._1, p._2)).toDF().cache()
> var filtered = edges
>   .filter("start = 1")
>   .distinct()
>   .withColumnRenamed("start", "fStart")
>   .withColumnRenamed("end", "fEnd")
>   .cache()
> var i = 0
> while (i < 300) {
>   i = i + 1
>   println("\n i = " + i)
>   filtered = filtered
> .join(edges, filtered("fEnd") === edges("start"))
> .select(filtered("fStart"), edges("end"))
> .withColumnRenamed("start", "fStart")
> .withColumnRenamed("end", "fEnd")
> .distinct
> .cache()
>   //filtered.explain()
>   //filtered.explain(true)
> 
>   //filtered = sqlContext.createDataFrame(filtered.rdd, filtered.schema)
>   filtered.show
> }
> {code}
> The error I get is and it causes (TaskSchedulerImpl: Lost executor driver on 
> localhost: Executor heartbeat timed out after 121001 ms):
> {code}
> 16/02/07 01:55:59 ERROR Utils: Uncaught exception in thread driver-heartbeater
> java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> scala.collection.immutable.HashMap$SerializationProxy to field 
> org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type 
> scala.collection.immutable.Map in instance of 
> org.apache.spark.executor.TaskMetrics
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
>   at 
> org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
>   at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
>   at 
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
>   at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
>   at 
> 

[jira] [Resolved] (SPARK-12940) Partition field in Spark SQL WHERE clause causing Exception

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12940.
--
Resolution: Cannot Reproduce

As it can’t be reproduced against master as reported, I am resolving this as 
{{Cannot Reproduce}}. If anyone can find the PR, we should backport, I think.

> Partition field in Spark SQL WHERE clause causing Exception
> ---
>
> Key: SPARK-12940
> URL: https://issues.apache.org/jira/browse/SPARK-12940
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2, 1.6.0
> Environment: AWS EMR 4.2, OSX
>Reporter: Brian Wheeler
> Attachments: spark-12940.txt
>
>
> I have partitioned Parquet that I am trying to query with Spark SQL. When I 
> involve a partition column in the {{WHERE}} clause when using {{OR}} I get an 
> exception.
> I have had this issue when using spark-submit on a cluster when the Parquet 
> was created externally and registered with Hive JDBC-backed metastore 
> externally. I can also duplicate this behavior with a simplified example in 
> the spark shell. I will include the simplified example. Note that I am using 
> my hive-site.xml when I launch the spark-shell so the metastore is set up the 
> same way.
> I also tried this locally with the same results on a Mac laptop with 1.6.0.
> Create some partitioned parquet:
> {code}
> case class Hit(meta_ts_unix_ms: Long, username: String, srclatitude: Double, 
> srclongitude: Double, srccity: String, srcregion: String, srccountrycode: 
> String, metaclass: String)
> val rdd = sc.parallelize(Array(Hit(34L, "user1", 45.2, 23.2, "city1", 
> "state1", "US", "blah, other"), Hit(35L, "user1", 53.2, 11.2, "city2", 
> "state2", "US", "blah")))
> sqlContext.createDataFrame(rdd).registerTempTable("test_table")
> sqlContext.sql("select * from test_table where meta_ts_unix_ms = 
> 35").write.parquet("file:///tmp/year=2015/month=12/day=4/hour=1/")
> sqlContext.sql("select * from test_table where meta_ts_unix_ms = 
> 34").write.parquet("file:///tmp/year=2015/month=12/day=3/hour=23/")
> {code}
> Create an external table from the parquet:
> {code}
> sqlContext.createExternalTable("test_table2", "file:///tmp/year=2015/", 
> "parquet")
> {code}
> If I understand correctly the partitions were discovered automatically 
> because they show up in the describe command even though they were not part 
> of the schema generated from the case classes:
> {code}
> +---+-+---+
> |   col_name|data_type|comment|
> +---+-+---+
> |meta_ts_unix_ms|   bigint|   |
> |   username|   string|   |
> |srclatitude|   double|   |
> |   srclongitude|   double|   |
> |srccity|   string|   |
> |  srcregion|   string|   |
> | srccountrycode|   string|   |
> |  metaclass|   string|   |
> |   year|  int|   |
> |  month|  int|   |
> |day|  int|   |
> |   hour|  int|   |
> +---+-+---+
> {code}
> This query:
> {code}
> sqlContext.sql("SELECT 
> meta_ts_unix_ms,username,srclatitude,srclongitude,srccity,srcregion,srccountrycode
>  FROM test_table2 WHERE meta_ts_unix_ms IS NOT NULL AND username IS NOT NULL 
> AND metaclass like '%blah%' OR hour = 1").show()
> {code}
> Throws this exception:
> {noformat}
> 16/01/20 21:36:46 WARN TaskSetManager: Lost task 0.0 in stage 13.0 (TID 84, 
> ip-192-168-111-222.ec2.internal): 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: metaclass#53
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:86)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:85)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:232)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:232)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at 

[jira] [Commented] (SPARK-18641) Show databases NullPointerException while Sentry turned on

2017-01-09 Thread zhangqw (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814210#comment-15814210
 ] 

zhangqw commented on SPARK-18641:
-

Yes, it seems sentry not fully support spark. I'm now using only HDFS access 
control for spark.

> Show databases NullPointerException while Sentry turned on
> --
>
> Key: SPARK-18641
> URL: https://issues.apache.org/jira/browse/SPARK-18641
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: CentOS 6.5 / Hive 1.1.0 / Sentry 1.5.1
>Reporter: zhangqw
>
> I've traced into source code, and it seems that  of 
> Sentry not set when spark sql started a session. This operation should be 
> done in org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook which is 
> not called in spark sql.
> Edit: I copyed hive-site.xml(which turns on Sentry) and all sentry jars into 
> spark's classpath.
> Here is the stack:
> ===
> 16/11/30 10:54:50 WARN SentryMetaStoreFilterHook: Error getting DB list
> java.lang.NullPointerException
> at 
> java.util.concurrent.ConcurrentHashMap.hash(ConcurrentHashMap.java:333)
> at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:988)
> at org.apache.hadoop.security.Groups.getGroups(Groups.java:162)
> at 
> org.apache.sentry.provider.common.HadoopGroupMappingService.getGroups(HadoopGroupMappingService.java:60)
> at 
> org.apache.sentry.binding.hive.HiveAuthzBindingHook.getHiveBindingWithPrivilegeCache(HiveAuthzBindingHook.java:956)
> at 
> org.apache.sentry.binding.hive.HiveAuthzBindingHook.filterShowDatabases(HiveAuthzBindingHook.java:826)
> at 
> org.apache.sentry.binding.metastore.SentryMetaStoreFilterHook.filterDb(SentryMetaStoreFilterHook.java:131)
> at 
> org.apache.sentry.binding.metastore.SentryMetaStoreFilterHook.filterDatabases(SentryMetaStoreFilterHook.java:59)
> at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getAllDatabases(HiveMetaStoreClient.java:1031)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
> at com.sun.proxy.$Proxy38.getAllDatabases(Unknown Source)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
> at 
> org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
> at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:166)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
> at 
> org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:170)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
> at 
> org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
> at 
> org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
> at 
> org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
> at 
> org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
> at 
> org.apache.spark.sql.hive.HiveSessionState.metadataHive$lzycompute(HiveSessionState.scala:43)
> at 
> org.apache.spark.sql.hive.HiveSessionState.metadataHive(HiveSessionState.scala:43)
> at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:62)
> at 
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:84)
> at 
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> 

[jira] [Updated] (SPARK-18641) Show databases NullPointerException while Sentry turned on

2017-01-09 Thread zhangqw (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangqw updated SPARK-18641:

Description: 
I've traced into source code, and it seems that  of 
Sentry not set when spark sql started a session. This operation should be done 
in org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook which is not 
called in spark sql.

Edit: I copyed hive-site.xml(which turns on Sentry) and all sentry jars into 
spark's classpath.

Here is the stack:
{noformat}
16/11/30 10:54:50 WARN SentryMetaStoreFilterHook: Error getting DB list
java.lang.NullPointerException
at 
java.util.concurrent.ConcurrentHashMap.hash(ConcurrentHashMap.java:333)
at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:988)
at org.apache.hadoop.security.Groups.getGroups(Groups.java:162)
at 
org.apache.sentry.provider.common.HadoopGroupMappingService.getGroups(HadoopGroupMappingService.java:60)
at 
org.apache.sentry.binding.hive.HiveAuthzBindingHook.getHiveBindingWithPrivilegeCache(HiveAuthzBindingHook.java:956)
at 
org.apache.sentry.binding.hive.HiveAuthzBindingHook.filterShowDatabases(HiveAuthzBindingHook.java:826)
at 
org.apache.sentry.binding.metastore.SentryMetaStoreFilterHook.filterDb(SentryMetaStoreFilterHook.java:131)
at 
org.apache.sentry.binding.metastore.SentryMetaStoreFilterHook.filterDatabases(SentryMetaStoreFilterHook.java:59)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getAllDatabases(HiveMetaStoreClient.java:1031)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
at com.sun.proxy.$Proxy38.getAllDatabases(Unknown Source)
at 
org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
at 
org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:166)
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
at 
org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:170)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
at 
org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
at 
org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
at 
org.apache.spark.sql.hive.HiveSessionState.metadataHive$lzycompute(HiveSessionState.scala:43)
at 
org.apache.spark.sql.hive.HiveSessionState.metadataHive(HiveSessionState.scala:43)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:62)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:84)
at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

  was:
I've traced into source code, and it seems that  of 
Sentry not set when spark sql started a session. This operation should be done 
in org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook which is not 
called in spark sql.

Edit: I copyed hive-site.xml(which turns on Sentry) and all sentry jars into 
spark's classpath.

Here is the stack:

[jira] [Commented] (SPARK-19146) Drop more elements when stageData.taskData.size > retainedTasks to reduce the number of times on call drop

2017-01-09 Thread Yuming Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814207#comment-15814207
 ] 

Yuming Wang commented on SPARK-19146:
-

The activated tasks more and more and then {{dynamicAllocation}} will lose 
efficacy If {{SparkListenerTaskEnd}} events cannot be consumed. 

!can-not-consume-taskEnd-events.jpg!

> Drop more elements when stageData.taskData.size > retainedTasks to reduce the 
> number of times on call drop
> --
>
> Key: SPARK-19146
> URL: https://issues.apache.org/jira/browse/SPARK-19146
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Yuming Wang
> Attachments: can-not-consume-taskEnd-events.jpg
>
>
> The performance of the 
> [{{drop}}|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L412]
>   function is bad.
> * Modify the code: 
> {code:java}
>   if (stageData.taskData.size > retainedTasks) {
> val start = System.currentTimeMillis()
> stageData.taskData = stageData.taskData.drop(stageData.taskData.size 
> - retainedTasks)
> logInfo(s"Time consuming: ${System.currentTimeMillis() - start}")
>   }
> {code}
> *  Time consuming
> {noformat}
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 156
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 145
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 148
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 159
> {noformat}
> My opinion is drop more elements when {{stageData.taskData.size > 
> retainedTasks}} to reduce the number of times on call {{drop}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-12911) Cacheing a dataframe causes array comparisons to fail (in filter / where) after 1.6

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12911.
--
Resolution: Cannot Reproduce

I can't reproduce this issue at the current master. I am resolving this as 
{{Cannot Reproduce}}. Please feel free to reopen this if I was wrong and anyone 
still faces this issue.

> Cacheing a dataframe causes array comparisons to fail (in filter / where) 
> after 1.6
> ---
>
> Key: SPARK-12911
> URL: https://issues.apache.org/jira/browse/SPARK-12911
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
> Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.6.0
>Reporter: Jesse English
>
> When doing a *where* operation on a dataframe and testing for equality on an 
> array type, after 1.6 no valid comparisons are made if the dataframe has been 
> cached.  If it has not been cached, the results are as expected.
> This appears to be related to the underlying unsafe array data types.
> {code:title=test.scala|borderStyle=solid}
> test("test array comparison") {
> val vectors: Vector[Row] =  Vector(
>   Row.fromTuple("id_1" -> Array(0L, 2L)),
>   Row.fromTuple("id_2" -> Array(0L, 5L)),
>   Row.fromTuple("id_3" -> Array(0L, 9L)),
>   Row.fromTuple("id_4" -> Array(1L, 0L)),
>   Row.fromTuple("id_5" -> Array(1L, 8L)),
>   Row.fromTuple("id_6" -> Array(2L, 4L)),
>   Row.fromTuple("id_7" -> Array(5L, 6L)),
>   Row.fromTuple("id_8" -> Array(6L, 2L)),
>   Row.fromTuple("id_9" -> Array(7L, 0L))
> )
> val data: RDD[Row] = sc.parallelize(vectors, 3)
> val schema = StructType(
>   StructField("id", StringType, false) ::
> StructField("point", DataTypes.createArrayType(LongType, false), 
> false) ::
> Nil
> )
> val sqlContext = new SQLContext(sc)
> val dataframe = sqlContext.createDataFrame(data, schema)
> val targetPoint:Array[Long] = Array(0L,9L)
> //Cacheing is the trigger to cause the error (no cacheing causes no error)
> dataframe.cache()
> //This is the line where it fails
> //java.util.NoSuchElementException: next on empty iterator
> //However we know that there is a valid match
> val targetRow = dataframe.where(dataframe("point") === 
> array(targetPoint.map(value => lit(value)): _*)).first()
> assert(targetRow != null)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12809) Spark SQL UDF does not work with struct input parameters

2017-01-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814182#comment-15814182
 ] 

Hyukjin Kwon commented on SPARK-12809:
--

Is this a duplicate of SPARK-12823?

> Spark SQL UDF does not work with struct input parameters
> 
>
> Key: SPARK-12809
> URL: https://issues.apache.org/jira/browse/SPARK-12809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Deenar Toraskar
>
> Spark SQL UDFs dont work with struct input parameters
> def testUDF(expectedExposures: (Float, Float))= {
> (expectedExposures._1 * expectedExposures._2 /expectedExposures._1) 
>   }
> sqlContext.udf.register("testUDF", testUDF _)
> sqlContext.sql("select testUDF(struct(noofmonths,ee)) from netExposureCpty")
> The full stacktrace is given below
> com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: 
> org.apache.spark.sql.AnalysisException: cannot resolve 
> 'UDF(struct(noofmonths,ee))' due to data type mismatch: argument 1 requires 
> struct<_1:float,_2:float> type, however, 'struct(noofmonths,ee)' is of 
> struct type.; line 1 pos 33
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-12754) Data type mismatch on two array values when using filter/where

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12754.
--
Resolution: Cannot Reproduce

I am resolving this as {{Cannot Reproduce}} because this was fixed in the 
master.

> Data type mismatch on two array values when using filter/where
> --
>
> Key: SPARK-12754
> URL: https://issues.apache.org/jira/browse/SPARK-12754
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0, 1.6.0
> Environment: OSX 10.11.1, Scala 2.11.7, Spark 1.5.0+
>Reporter: Jesse English
>
> The following test produces the error 
> _org.apache.spark.sql.AnalysisException: cannot resolve '(point = 
> array(0,9))' due to data type mismatch: differing types in '(point = 
> array(0,9))' (array and array)_
> This is not the case on 1.4.x, but has been introduced with 1.5+.  Is there a 
> preferred method for making this sort of arbitrarily sized array comparison?
> {code:title=test.scala}
> test("test array comparison") {
> val vectors: Vector[Row] =  Vector(
>   Row.fromTuple("id_1" -> Array(0L, 2L)),
>   Row.fromTuple("id_2" -> Array(0L, 5L)),
>   Row.fromTuple("id_3" -> Array(0L, 9L)),
>   Row.fromTuple("id_4" -> Array(1L, 0L)),
>   Row.fromTuple("id_5" -> Array(1L, 8L)),
>   Row.fromTuple("id_6" -> Array(2L, 4L)),
>   Row.fromTuple("id_7" -> Array(5L, 6L)),
>   Row.fromTuple("id_8" -> Array(6L, 2L)),
>   Row.fromTuple("id_9" -> Array(7L, 0L))
> )
> val data: RDD[Row] = sc.parallelize(vectors, 3)
> val schema = StructType(
>   StructField("id", StringType, false) ::
> StructField("point", DataTypes.createArrayType(LongType), false) ::
> Nil
> )
> val sqlContext = new SQLContext(sc)
> var dataframe = sqlContext.createDataFrame(data, schema)
> val  targetPoint:Array[Long] = Array(0L,9L)
> //This is the line where it fails
> //org.apache.spark.sql.AnalysisException: cannot resolve 
> // '(point = array(0,9))' due to data type mismatch:
> // differing types in '(point = array(0,9))' 
> // (array and array).
> val targetRow = dataframe.where(dataframe("point") === 
> array(targetPoint.map(value => lit(value)): _*)).first()
> assert(targetRow != null)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-01-09 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814165#comment-15814165
 ] 

gagan taneja commented on SPARK-19145:
--

i should be able to work on a proposal for the fix

> Timestamp to String casting is slowing the query significantly
> --
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> i have a time series table with timestamp column 
> Following query
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> is significantly SLOWER than 
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> After investigation i found that in the first query time colum is cast to 
> String before applying the filter 
> However in the second query no such casting is performed and its a filter 
> with long value 
> Below are the generate Physical plan for slower execution followed by 
> physical plan for faster execution 
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3339L])
>  +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) 
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 
> 19:53:51))
>+- *FileScan parquet default.cstat[time#3314] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
> struct
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3287L])
>  +- *Project
> +- *Filter ((isnotnull(time#3262) && (time#3262 >= 
> 148340483100)) && (time#3262 <= 148400963100))
>+- *FileScan parquet default.cstat[time#3262] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time), 
> GreaterThanOrEqual(time,2017-01-02 19:53:51.0), 
> LessThanOrEqual(time,2017-01-09..., ReadSchema: struct
> In Impala both query run efficiently without and performance difference
> Spark should be able to parse the Date string and convert to Long/Timestamp 
> during generation of Optimized Logical Plan so that both the query would have 
> similar performance



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19146) Drop more elements when stageData.taskData.size > retainedTasks to reduce the number of times on call drop

2017-01-09 Thread Yuming Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-19146:

Attachment: can-not-consume-taskEnd-events.jpg

> Drop more elements when stageData.taskData.size > retainedTasks to reduce the 
> number of times on call drop
> --
>
> Key: SPARK-19146
> URL: https://issues.apache.org/jira/browse/SPARK-19146
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Yuming Wang
> Attachments: can-not-consume-taskEnd-events.jpg
>
>
> The performance of the 
> [{{drop}}|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L412]
>   function is bad.
> * Modify the code: 
> {code:java}
>   if (stageData.taskData.size > retainedTasks) {
> val start = System.currentTimeMillis()
> stageData.taskData = stageData.taskData.drop(stageData.taskData.size 
> - retainedTasks)
> logInfo(s"Time consuming: ${System.currentTimeMillis() - start}")
>   }
> {code}
> *  Time consuming
> {noformat}
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 156
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 145
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 148
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 159
> {noformat}
> My opinion is drop more elements when {{stageData.taskData.size > 
> retainedTasks}} to reduce the number of times on call {{drop}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19146) Drop more elements when stageData.taskData.size > retainedTasks to reduce the number of times on call drop

2017-01-09 Thread Yuming Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814152#comment-15814152
 ] 

Yuming Wang commented on SPARK-19146:
-

I will create a PR later

> Drop more elements when stageData.taskData.size > retainedTasks to reduce the 
> number of times on call drop
> --
>
> Key: SPARK-19146
> URL: https://issues.apache.org/jira/browse/SPARK-19146
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Yuming Wang
>
> The performance of the 
> [{{drop}}|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L412]
>   function is bad.
> * Modify the code: 
> {code:java}
>   if (stageData.taskData.size > retainedTasks) {
> val start = System.currentTimeMillis()
> stageData.taskData = stageData.taskData.drop(stageData.taskData.size 
> - retainedTasks)
> logInfo(s"Time consuming: ${System.currentTimeMillis() - start}")
>   }
> {code}
> *  Time consuming
> {noformat}
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 156
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 145
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 148
> 17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 159
> {noformat}
> My opinion is drop more elements when {{stageData.taskData.size > 
> retainedTasks}} to reduce the number of times on call {{drop}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19146) Drop more elements when stageData.taskData.size > retainedTasks to reduce the number of times on call drop

2017-01-09 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-19146:
---

 Summary: Drop more elements when stageData.taskData.size > 
retainedTasks to reduce the number of times on call drop
 Key: SPARK-19146
 URL: https://issues.apache.org/jira/browse/SPARK-19146
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: Yuming Wang


The performance of the 
[{{drop}}|https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L412]
  function is bad.

* Modify the code: 
{code:java}
  if (stageData.taskData.size > retainedTasks) {
val start = System.currentTimeMillis()
stageData.taskData = stageData.taskData.drop(stageData.taskData.size - 
retainedTasks)
logInfo(s"Time consuming: ${System.currentTimeMillis() - start}")
  }
{code}
*  Time consuming
{noformat}
17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 156
17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 145
17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 148
17/01/10 14:04:05 INFO JobProgressListener: Time consuming: 159
{noformat}

My opinion is drop more elements when {{stageData.taskData.size > 
retainedTasks}} to reduce the number of times on call {{drop}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-01-09 Thread gagan taneja (JIRA)
gagan taneja created SPARK-19145:


 Summary: Timestamp to String casting is slowing the query 
significantly
 Key: SPARK-19145
 URL: https://issues.apache.org/jira/browse/SPARK-19145
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: gagan taneja


i have a time series table with timestamp column 

Following query
SELECT COUNT(*) AS `count`
   FROM `default`.`table`
   WHERE `time` >= '2017-01-02 19:53:51'
AND `time` <= '2017-01-09 19:53:51' LIMIT 5

is significantly SLOWER than 

SELECT COUNT(*) AS `count`
FROM `default`.`table`
WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
HH24:MI:SS−0800')
  AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
HH24:MI:SS−0800') LIMIT 5


After investigation i found that in the first query time colum is cast to 
String before applying the filter 
However in the second query no such casting is performed and its a filter with 
long value 

Below are the generate Physical plan for slower execution followed by physical 
plan for faster execution 

SELECT COUNT(*) AS `count`
   FROM `default`.`table`
   WHERE `time` >= '2017-01-02 19:53:51'
AND `time` <= '2017-01-09 19:53:51' LIMIT 5

== Physical Plan ==
CollectLimit 5
+- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
   +- Exchange SinglePartition
  +- *HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#3339L])
 +- *Project
+- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) >= 
2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 19:53:51))
   +- *FileScan parquet default.cstat[time#3314] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
struct

SELECT COUNT(*) AS `count`
FROM `default`.`table`
WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
HH24:MI:SS−0800')
  AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
HH24:MI:SS−0800') LIMIT 5

== Physical Plan ==
CollectLimit 5
+- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
   +- Exchange SinglePartition
  +- *HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#3287L])
 +- *Project
+- *Filter ((isnotnull(time#3262) && (time#3262 >= 
148340483100)) && (time#3262 <= 148400963100))
   +- *FileScan parquet default.cstat[time#3262] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
PartitionFilters: [], PushedFilters: [IsNotNull(time), 
GreaterThanOrEqual(time,2017-01-02 19:53:51.0), 
LessThanOrEqual(time,2017-01-09..., ReadSchema: struct

In Impala both query run efficiently without and performance difference
Spark should be able to parse the Date string and convert to Long/Timestamp 
during generation of Optimized Logical Plan so that both the query would have 
similar performance




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14272) Evaluate GaussianMixtureModel with LogLikelihood

2017-01-09 Thread Yanbo Liang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanbo Liang updated SPARK-14272:

Shepherd: Yanbo Liang

> Evaluate GaussianMixtureModel with LogLikelihood
> 
>
> Key: SPARK-14272
> URL: https://issues.apache.org/jira/browse/SPARK-14272
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: zhengruifeng
>Priority: Minor
>
> GMM use EM to maximum the likelihood of data. So likelihood can be a useful 
> metric to evaluate GaussianMixtureModel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-12586) Wrong answer with registerTempTable and union sql query

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12586.
--
Resolution: Not A Problem

I just ran the codes you attached and it prints as below:

{code}
+---+
|v_value|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

+---+---+---+---+-+
|row|col|foo|bar|value|
+---+---+---+---+-+
|  3|  1|  1|  1| null|
|  2|  1|  1|  1|3|
|  3|  2|  1|  1| null|
|  3|  3|  1|  1|2|
|  3|  4|  1|  2| null|
+---+---+---+---+-+

Traceback (most recent call last):
  File "/Users/hyukjinkwon/Desktop/workspace/repos/forked/spark/sql_bug.py", 
line 52, in 
result.show()
  File "./python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
  File 
"/Users/hyukjinkwon/Desktop/workspace/repos/forked/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
 line 1133, in __call__
  File "./python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Detected cartesian product for INNER 
join between logical plans\nProject [v1#36L]\n+- Filter isnull(v2#37L)\n   +- 
Join LeftOuter, (v1#36L = v2#37L)\n  :- Project [v_value#0L AS v1#36L]\n
  :  +- LogicalRDD [v_value#0L]\n  +- Aggregate [row#7L, col#8L, foo#9L, 
bar#10L, value#28L], [value#28L AS v2#37L]\n +- Union\n:- 
Project [row#7L, col#8L, foo#9L, bar#10L, 8 AS value#28L]\n:  +- 
Filter (((isnotnull(row#7L) && isnotnull(col#8L)) && ((row#7L = 1) && (col#8L = 
2))) && (((isnotnull(bar#10L) && isnotnull(foo#9L)) && (foo#9L = 1)) && 
(bar#10L = 2)))\n: +- LogicalRDD [row#7L, col#8L, foo#9L, 
bar#10L, value#11L]\n+- Filter ((NOT (row#7L = 1) || NOT (col#8L = 
2)) && isnotnull(bar#10L) && isnotnull(foo#9L)) && (foo#9L = 1)) && 
(bar#10L = 2)) && isnotnull(value#11L)))\n   +- LogicalRDD [row#7L, 
col#8L, foo#9L, bar#10L, value#11L]\nand\nAggregate [row#7L, col#8L, foo#9L, 
bar#10L, value#28L], [row#7L, col#8L]\n+- Union\n   :- Project [row#7L, col#8L, 
foo#9L, bar#10L, 8 AS value#28L]\n   :  +- LocalRelation , [row#7L, 
col#8L, foo#9L, bar#10L, value#11L]\n   +- Filter ((NOT (row#7L = 1) || NOT 
(col#8L = 2)) && isnotnull(bar#10L) && isnotnull(foo#9L)) && (foo#9L = 1)) 
&& (bar#10L = 2)) && isnull(value#11L)))\n  +- LogicalRDD [row#7L, col#8L, 
foo#9L, bar#10L, value#11L]\nJoin condition is missing or trivial.\nUse the 
CROSS JOIN syntax to allow cartesian products between these relations.;'
{code}

The behaviour was changed and it seems this JIRA is obsolete. I am resolving 
this as {{Not A Problem}}. Please reopen this if anyone feels this is an 
inappropriate action or I was wrong.


> Wrong answer with registerTempTable and union sql query
> ---
>
> Key: SPARK-12586
> URL: https://issues.apache.org/jira/browse/SPARK-12586
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
> Environment: Windows 7
>Reporter: shao lo
> Attachments: sql_bug.py
>
>
> The following sequence of sql(), registerTempTable() calls gets the wrong 
> answer.
> The correct answer is returned if the temp table is rewritten?
> {code}
> sql_text = """select row, col, foo, bar, value2 value
> from (select row, col, foo, bar, 8 value2 from t0 where row=1 
> and col=2) s1
>   union select row, col, foo, bar, value from t0 where 
> not (row=1 and col=2)"""
> df2 = sqlContext.sql(sql_text)
> df2.registerTempTable("t1")
> # # The following 2 line workaround fixes the problem somehow?
> # df3 = sqlContext.createDataFrame(df2.collect())
> # df3.registerTempTable("t1")
> # # The following 4 line workaround fixes the problem too..but takes way 
> longer
> # filename = "t1.json"
> # df2.write.json(filename, mode='overwrite')
> # df3 = sqlContext.read.json(filename)
> # df3.registerTempTable("t1")
> sql_text2 = """select row, col, v1 value from
> (select v1 from
> (select v_value v1 from values) s1
>   left join
> (select value v2,foo,bar,row,col from t1
>   where foo=1
> and bar=2 and value is not null) s2
>   on v1=v2 where v2 is null
> ) sa join
> (select row, col from t1 where foo=1
> and bar=2 and value is null) sb"""
> result = sqlContext.sql(sql_text2)
> result.show()
> 
> # Expected result
> # +---+---+-+
> # |row|col|value|
> # +---+---+-+
> # |  3|  4|1|
> # |  3|  4|2|
> # |  3|  4|3|
> # |  3|  4|4|
> # +---+---+-+
> # Getting this 

[jira] [Resolved] (SPARK-12484) DataFrame withColumn() does not work in Java

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12484.
--
Resolution: Invalid

The API works as expected and they are being tested. I don't think just saying 
{{withColumn}} in Java does not work makes sense at all.

Also, according to the email thread you suggested, it seems not really issue in 
Spark.

I think it'd make sense open another issue or reopen this after fixing the 
title and description if it is really an issue.

Per description/title, it does not make sense and therefore I am resolving this 
as {{Invalid}}.

> DataFrame withColumn() does not work in Java
> 
>
> Key: SPARK-12484
> URL: https://issues.apache.org/jira/browse/SPARK-12484
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.2
> Environment: mac El Cap. 10.11.2
> Java 8
>Reporter: Andrew Davidson
> Attachments: UDFTest.java
>
>
> DataFrame transformerdDF = df.withColumn(fieldName, newCol); raises
>  org.apache.spark.sql.AnalysisException: resolved attribute(s) _c0#2 missing 
> from id#0,labelStr#1 in operator !Project [id#0,labelStr#1,_c0#2 AS 
> transformedByUDF#3];
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:154)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
> at 
> org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
> at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:691)
> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1150)
> at com.pws.fantasySport.ml.UDFTest.test(UDFTest.java:75)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814001#comment-15814001
 ] 

Saisai Shao commented on SPARK-19090:
-

Are you using SparkConf API to set configuration in application run-time? From 
the code I could see you did that . This won't be worked, at least for yarn 
cluster mode.

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14272) Evaluate GaussianMixtureModel with LogLikelihood

2017-01-09 Thread Yanbo Liang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813980#comment-15813980
 ] 

Yanbo Liang commented on SPARK-14272:
-

[~podongfeng]  SPARK-17847 has been merged, please move this on when you have 
time. Thanks.

> Evaluate GaussianMixtureModel with LogLikelihood
> 
>
> Key: SPARK-14272
> URL: https://issues.apache.org/jira/browse/SPARK-14272
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: zhengruifeng
>Priority: Minor
>
> GMM use EM to maximum the likelihood of data. So likelihood can be a useful 
> metric to evaluate GaussianMixtureModel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-17847) Reduce shuffled data size of GaussianMixture & copy the implementation from mllib to ml

2017-01-09 Thread Yanbo Liang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanbo Liang resolved SPARK-17847.
-
   Resolution: Fixed
Fix Version/s: 2.2.0

> Reduce shuffled data size of GaussianMixture & copy the implementation from 
> mllib to ml
> ---
>
> Key: SPARK-17847
> URL: https://issues.apache.org/jira/browse/SPARK-17847
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
> Fix For: 2.2.0
>
>
> Copy {{GaussianMixture}} implementation from mllib to ml, then we can add new 
> features to it.
> I left mllib {{GaussianMixture}} untouched, unlike some other algorithms to 
> wrap the ml implementation. For the following reasons:
> * mllib {{GaussianMixture}} allow k == 1, but ml does not.
> * mllib {{GaussianMixture}} supports setting initial model, but ml does not 
> support currently. (We will definitely add this feature for ml in the future)
> Meanwhile, There is a big performance improvement for {{GaussianMixture}} in 
> this task. Since the covariance matrix of multivariate gaussian distribution 
> is symmetric, we can only store the upper triangular part of the matrix and 
> it will greatly reduce the shuffled data size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813973#comment-15813973
 ] 

Saisai Shao commented on SPARK-19090:
-

Spark shell is a real spark *application*. The underlying SparkSubmit logics 
are the same...

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813962#comment-15813962
 ] 

nirav patel edited comment on SPARK-19090 at 1/10/17 5:39 AM:
--

Oh right, I have them set exclusively. I corrected my previous comment. I 
verified that dynamic allocation was enabled by checking following in driver 
logs:

[spark-dynamic-executor-allocation] org.apache.spark.ExecutorAllocationManager: 
Requesting 4 new executors because tasks are backlogged (new desired total will 
be 6)

If it was not enabled then it should have actually create 6 executors with 5 
cores. 

here's the snippet of code I have:

  if(sparkConfig.dynamicAllocation){

sparkConf.set("spark.dynamicAllocation.enabled", "true")

sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "600s")

sparkConf.set("spark.dynamicAllocation.initialExecutors", 
sparkConfig.executorInstances)

sparkConf.set("spark.dynamicAllocation.minExecutors", 
String.valueOf((Integer.valueOf(sparkConfig.executorInstances) - 3)))

sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"300s")

sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "120")

} else {
  sparkConf.set("spark.executor.instances", 
sparkConfig.executorInstances)
}

  sparkConf.set("spark.executor.cores", 
sparkConfig.executorCores)


was (Author: tenstriker):
Oh right, I have them set exclusively. I corrected my previous comment. I 
verified that dynamic allocation was enabled by checking following in driver 
logs:
2017-01-04 12:04:11,362 INFO [spark-dynamic-executor-allocation] 
org.apache.spark.ExecutorAllocationManager: Requesting 4 new executors because 
tasks are backlogged (new desired total will be 6)

If it was not enabled then it should have actually create 6 executors with 5 
cores. 

here's the snippet of code I have:

  if(sparkConfig.dynamicAllocation){

sparkConf.set("spark.dynamicAllocation.enabled", "true")

sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "600s")

sparkConf.set("spark.dynamicAllocation.initialExecutors", 
sparkConfig.executorInstances)

sparkConf.set("spark.dynamicAllocation.minExecutors", 
String.valueOf((Integer.valueOf(sparkConfig.executorInstances) - 3)))

sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"300s")

sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "120")

} else {
  sparkConf.set("spark.executor.instances", 
sparkConfig.executorInstances)
}

  sparkConf.set("spark.executor.cores", 
sparkConfig.executorCores)

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19144) Add test for GaussianMixture with distributed decompositions

2017-01-09 Thread Yanbo Liang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813971#comment-15813971
 ] 

Yanbo Liang commented on SPARK-19144:
-

cc [~sethah]

> Add test for GaussianMixture with distributed decompositions
> 
>
> Key: SPARK-19144
> URL: https://issues.apache.org/jira/browse/SPARK-19144
> Project: Spark
>  Issue Type: Test
>  Components: ML
>Reporter: Yanbo Liang
>Priority: Minor
>
> {code}
> test("check distributed decomposition") {
> val k = 5
> val d = decompositionData.head.size
> assert(GaussianMixture.shouldDistributeGaussians(k, d))
> val gmm = new 
> GaussianMixture().setK(k).setSeed(seed).fit(decompositionDataset)
> assert(gmm.getK === k)
> }
> {code}
> In ML {{GaussianMixtureSuite}}, the above test only check that when we 
> distribute the computation that it produces a model, but does not check that 
> it produces a correct model.
> It should have, but depends on {{GaussianMixture}} can be set with 
> initialModel (SPARK-15785), otherwise, this algorithm seems incapable of 
> learning even this very contrived example. After that was resolved, we can 
> add a correctness test in the above case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813962#comment-15813962
 ] 

nirav patel edited comment on SPARK-19090 at 1/10/17 5:38 AM:
--

Oh right, I have them set exclusively. I corrected my previous comment. I 
verified that dynamic allocation was enabled by checking following in driver 
logs:
2017-01-04 12:04:11,362 INFO [spark-dynamic-executor-allocation] 
org.apache.spark.ExecutorAllocationManager: Requesting 4 new executors because 
tasks are backlogged (new desired total will be 6)

If it was not enabled then it should have actually create 6 executors with 5 
cores. 

here's the snippet of code I have:

  if(sparkConfig.dynamicAllocation){

sparkConf.set("spark.dynamicAllocation.enabled", "true")

sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "600s")

sparkConf.set("spark.dynamicAllocation.initialExecutors", 
sparkConfig.executorInstances)

sparkConf.set("spark.dynamicAllocation.minExecutors", 
String.valueOf((Integer.valueOf(sparkConfig.executorInstances) - 3)))

sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"300s")

sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "120")

} else {
  sparkConf.set("spark.executor.instances", 
sparkConfig.executorInstances)
}

  sparkConf.set("spark.executor.cores", 
sparkConfig.executorCores)


was (Author: tenstriker):
Oh right, I have that set exclusively. I corrected my comment. I verified that 
dynamic allocation was enabled by checking following in driver logs:
2017-01-04 12:04:11,362 INFO [spark-dynamic-executor-allocation] 
org.apache.spark.ExecutorAllocationManager: Requesting 4 new executors because 
tasks are backlogged (new desired total will be 6)

If it was not enabled then it should have actually create 6 executors with 5 
cores. 

here's the snippet of code I have:

  if(sparkConfig.dynamicAllocation){

sparkConf.set("spark.dynamicAllocation.enabled", "true")

sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "600s")

sparkConf.set("spark.dynamicAllocation.initialExecutors", 
sparkConfig.executorInstances)

sparkConf.set("spark.dynamicAllocation.minExecutors", 
String.valueOf((Integer.valueOf(sparkConfig.executorInstances) - 3)))

sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"300s")

sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "120")

} else {
  sparkConf.set("spark.executor.instances", 
sparkConfig.executorInstances)
}

  sparkConf.set("spark.executor.cores", 
sparkConfig.executorCores)

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813969#comment-15813969
 ] 

nirav patel commented on SPARK-19090:
-

Also you are just invoking spark-shell here and not submitting any real spark 
application. Those two may have different code path that uses different logic 
to set spark parameters. 

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12076) countDistinct behaves inconsistently

2017-01-09 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813966#comment-15813966
 ] 

Herman van Hovell commented on SPARK-12076:
---

What is the problem? That the plans are different?

> countDistinct behaves inconsistently
> 
>
> Key: SPARK-12076
> URL: https://issues.apache.org/jira/browse/SPARK-12076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Paul Zaczkieiwcz
>Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} 
> and all columns beginning with "join_" are from {{joinKeys}}.  The following 
> queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more 
> columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from 
> {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet 
> files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
>  
> output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>TungstenProject 
> [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
> SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> [join_session_id#41,join_asset_id#42,join_euid#43]
>  TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
> ASC], false, 0
>   TungstenExchange 
> hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>ConvertToUnsafe
> Scan 
> ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]
>  TungstenSort [join_session_id#41 

[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813962#comment-15813962
 ] 

nirav patel commented on SPARK-19090:
-

Oh right, I have that set exclusively. I corrected my comment. I verified that 
dynamic allocation was enabled by checking following in driver logs:
2017-01-04 12:04:11,362 INFO [spark-dynamic-executor-allocation] 
org.apache.spark.ExecutorAllocationManager: Requesting 4 new executors because 
tasks are backlogged (new desired total will be 6)

If it was not enabled then it should have actually create 6 executors with 5 
cores. 

here's the snippet of code I have:

  if(sparkConfig.dynamicAllocation){

sparkConf.set("spark.dynamicAllocation.enabled", "true")

sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "600s")

sparkConf.set("spark.dynamicAllocation.initialExecutors", 
sparkConfig.executorInstances)

sparkConf.set("spark.dynamicAllocation.minExecutors", 
String.valueOf((Integer.valueOf(sparkConfig.executorInstances) - 3)))

sparkConf.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"300s")

sparkConf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "120")

} else {
  sparkConf.set("spark.executor.instances", 
sparkConfig.executorInstances)
}

  sparkConf.set("spark.executor.cores", 
sparkConfig.executorCores)

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-12307) ParquetFormat options should be exposed through the DataFrameReader/Writer options API

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-12307.
--
Resolution: Duplicate

I believe we can configure this now, for examples,

https://github.com/apache/spark/blob/74f5c2176d8449e41f520febd38109edaf3f4172/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala#L464-L476

and 

https://github.com/apache/spark/blob/74f5c2176d8449e41f520febd38109edaf3f4172/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala#L518-L544

I am resolving this as a duplicate. Please reopen this if I misunderstood the 
issue or I was wrong.

> ParquetFormat options should be exposed through the DataFrameReader/Writer 
> options API
> --
>
> Key: SPARK-12307
> URL: https://issues.apache.org/jira/browse/SPARK-12307
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: holdenk
>Priority: Trivial
>
> Currently many options for loading/saving Parquet need to be set globally on 
> the SparkContext. It would be useful to also provide support for setting 
> these options through the DataFrameReader/DataFrameWriter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813689#comment-15813689
 ] 

nirav patel edited comment on SPARK-19090 at 1/10/17 5:33 AM:
--

[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this issue. Following are AM logs from two different runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )

I verified same fact via Spark UI when job is running that with dynamic 
allocation there is only 1 task running per executor.


was (Author: tenstriker):
[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this issue. Following are AM logs from two different runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 

[jira] [Commented] (SPARK-12264) Add a typeTag or scalaTypeTag method to DataType

2017-01-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813946#comment-15813946
 ] 

Hyukjin Kwon commented on SPARK-12264:
--

(I just simple change the title to 

{quote}
add a typeTag or scalaTypeTag method to DataType.
{quote}
)

> Add a typeTag or scalaTypeTag method to DataType
> 
>
> Key: SPARK-12264
> URL: https://issues.apache.org/jira/browse/SPARK-12264
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Andras Nemeth
>Priority: Minor
>
> We are writing code that's dealing with generic DataFrames as inputs and 
> further processes their contents with normal RDD operations (not SQL). We 
> need some mechanism that tells us exactly what Scala types we will find 
> inside a Row of a given DataFrame.
> The schema of the DataFrame contains this information in an abstract sense. 
> But we need to map it to TypeTags, as that's what the rest of the system uses 
> to identify what RDD contains what type of data - quite the natural choice in 
> Scala.
> As far as I can tell, there is no good way to do this today. For now we have 
> a hand coded mapping, but that feels very fragile as spark evolves. Is there 
> a better way I'm missing? And if not, could we create one? Adding a typeTag 
> or scalaTypeTag method to DataType, or at least to AtomicType  seems easy 
> enough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12264) Add a typeTag or scalaTypeTag method to DataType.

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-12264:
-
Summary: Add a typeTag or scalaTypeTag method to DataType.  (was: Could 
DataType provide a TypeTag?)

> Add a typeTag or scalaTypeTag method to DataType.
> -
>
> Key: SPARK-12264
> URL: https://issues.apache.org/jira/browse/SPARK-12264
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Andras Nemeth
>Priority: Minor
>
> We are writing code that's dealing with generic DataFrames as inputs and 
> further processes their contents with normal RDD operations (not SQL). We 
> need some mechanism that tells us exactly what Scala types we will find 
> inside a Row of a given DataFrame.
> The schema of the DataFrame contains this information in an abstract sense. 
> But we need to map it to TypeTags, as that's what the rest of the system uses 
> to identify what RDD contains what type of data - quite the natural choice in 
> Scala.
> As far as I can tell, there is no good way to do this today. For now we have 
> a hand coded mapping, but that feels very fragile as spark evolves. Is there 
> a better way I'm missing? And if not, could we create one? Adding a typeTag 
> or scalaTypeTag method to DataType, or at least to AtomicType  seems easy 
> enough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12264) Add a typeTag or scalaTypeTag method to DataType

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-12264:
-
Summary: Add a typeTag or scalaTypeTag method to DataType  (was: Add a 
typeTag or scalaTypeTag method to DataType.)

> Add a typeTag or scalaTypeTag method to DataType
> 
>
> Key: SPARK-12264
> URL: https://issues.apache.org/jira/browse/SPARK-12264
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Andras Nemeth
>Priority: Minor
>
> We are writing code that's dealing with generic DataFrames as inputs and 
> further processes their contents with normal RDD operations (not SQL). We 
> need some mechanism that tells us exactly what Scala types we will find 
> inside a Row of a given DataFrame.
> The schema of the DataFrame contains this information in an abstract sense. 
> But we need to map it to TypeTags, as that's what the rest of the system uses 
> to identify what RDD contains what type of data - quite the natural choice in 
> Scala.
> As far as I can tell, there is no good way to do this today. For now we have 
> a hand coded mapping, but that feels very fragile as spark evolves. Is there 
> a better way I'm missing? And if not, could we create one? Adding a typeTag 
> or scalaTypeTag method to DataType, or at least to AtomicType  seems easy 
> enough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813941#comment-15813941
 ] 

Saisai Shao commented on SPARK-19090:
-

{code}
./bin/spark-shell --master yarn-client --conf spark.executor.cores=2
{code}

Please be aware that executor number (--num-executors/spark.executor.instances) 
and dynamic allocation cannot be coexisted, otherwise dynamic allocation will 
be turned off implicitly. So in your case you set executor numbers also, which 
means dynamic allocation is not on actually.

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19144) Add test for GaussianMixture with distributed decompositions

2017-01-09 Thread Yanbo Liang (JIRA)
Yanbo Liang created SPARK-19144:
---

 Summary: Add test for GaussianMixture with distributed 
decompositions
 Key: SPARK-19144
 URL: https://issues.apache.org/jira/browse/SPARK-19144
 Project: Spark
  Issue Type: Test
  Components: ML
Reporter: Yanbo Liang
Priority: Minor


{code}
test("check distributed decomposition") {
val k = 5
val d = decompositionData.head.size
assert(GaussianMixture.shouldDistributeGaussians(k, d))

val gmm = new 
GaussianMixture().setK(k).setSeed(seed).fit(decompositionDataset)
assert(gmm.getK === k)
}
{code}
In ML {{GaussianMixtureSuite}}, the above test only check that when we 
distribute the computation that it produces a model, but does not check that it 
produces a correct model.
It should have, but depends on {{GaussianMixture}} can be set with initialModel 
(SPARK-15785), otherwise, this algorithm seems incapable of learning even this 
very contrived example. After that was resolved, we can add a correctness test 
in the above case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12076) countDistinct behaves inconsistently

2017-01-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813923#comment-15813923
 ] 

Hyukjin Kwon commented on SPARK-12076:
--

Could I ask to narrow down the problem or self-contained reproducer? I am 
willing to help verify this.

> countDistinct behaves inconsistently
> 
>
> Key: SPARK-12076
> URL: https://issues.apache.org/jira/browse/SPARK-12076
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Paul Zaczkieiwcz
>Priority: Minor
>
> Assume:
> {code:java}
> val slicePlayed:DataFrame = _
> val joinKeys:DataFrame = _
> {code}
> Also assume that all columns beginning with "cdnt_" are from {{slicePlayed}} 
> and all columns beginning with "join_" are from {{joinKeys}}.  The following 
> queries can return different values for slice_count_distinct:
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number")
> ).show(false)
> {code}
> {code:java}
> slicePlayed.join(
>   joinKeys,
>   ( 
> $"join_session_id" === $"cdnt_session_id" &&
> $"join_asset_id" === $"cdnt_asset_id" &&
> $"join_euid" === $"cdnt_euid"
>   ),
>   "inner"
> ).groupBy(
>   $"cdnt_session_id".as("slice_played_session_id"),
>   $"cdnt_asset_id".as("slice_played_asset_id"),
>   $"cdnt_euid".as("slice_played_euid")
> ).agg(
>   min($"cdnt_event_time").as("slice_start_time"),
>   min($"cdnt_playing_owner_id").as("slice_played_playing_owner_id"),
>   min($"cdnt_user_ip").as("slice_played_user_ip"),
>   min($"cdnt_user_agent").as("slice_played_user_agent"),
>   min($"cdnt_referer").as("slice_played_referer"),
>   max($"cdnt_event_time").as("slice_end_time"),
>   countDistinct($"cdnt_slice_number").as("slice_count_distinct"),
>   count($"cdnt_slice_number").as("slice_count_total"),
>   min($"cdnt_slice_number").as("min_slice_number"),
>   max($"cdnt_slice_number").as("max_slice_number"),
>   min($"cdnt_is_live").as("is_live")
> ).show(false)
> {code}
> The +only+ difference between the two queries are that I'm adding more 
> columns to the {{agg}} method.
> I can't reproduce by manually creating a dataFrame from 
> {{DataFrame.parallelize}}. The original sources of the dataFrames are parquet 
> files.
> The explain plans for the two queries are slightly different.
> {code}
> == Physical Plan ==
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> functions=[(count(cdnt_slice_number#24L),mode=Final,isDistinct=false),(min(cdnt_slice_number#24L),mode=Final,isDistinct=false),(max(cdnt_slice_number#24L),mode=Final,isDistinct=false),(count(cdnt_slice_number#24L),mode=Complete,isDistinct=true)],
>  
> output=[slice_played_session_id#780,slice_played_asset_id#781,slice_played_euid#782,slice_count_distinct#783L,slice_count_total#784L,min_slice_number#785L,max_slice_number#786L])
>  
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(min(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false),(max(cdnt_slice_number#24L),mode=PartialMerge,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>   
> TungstenAggregate(key=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L],
>  
> functions=[(count(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(min(cdnt_slice_number#24L),mode=Partial,isDistinct=false),(max(cdnt_slice_number#24L),mode=Partial,isDistinct=false)],
>  
> output=[cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L,currentCount#795L,min#797L,max#799L])
>TungstenProject 
> [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13,cdnt_slice_number#24L]
> SortMergeJoin [cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13], 
> [join_session_id#41,join_asset_id#42,join_euid#43]
>  TungstenSort [cdnt_session_id#23 ASC,cdnt_asset_id#5 ASC,cdnt_euid#13 
> ASC], false, 0
>   TungstenExchange 
> hashpartitioning(cdnt_session_id#23,cdnt_asset_id#5,cdnt_euid#13)
>ConvertToUnsafe
> Scan 
> ParquetRelation[hdfs://hadoop-namenode1:8020/user/hive/warehouse/src_cdn_events][cdnt_slice_number#24L,cdnt_euid#13,cdnt_asset_id#5,cdnt_session_id#23]

[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813918#comment-15813918
 ] 

nirav patel commented on SPARK-19090:
-

I am using oozie spark-action to submit job. I set all spark parameters via 
SparkConf. Can you tell me what parameter you used with dynamic allocation to 
allow number of cores = 2 ?

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-9502) ArrayTypes incorrect for DataFrames Java API

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-9502.
-
Resolution: Not A Problem

Now it seems throwing a different exception as below:

{code}
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
java.util.ArrayList is not a valid external type for schema of array
{code}

I am resolving this as {{Not a problem}} because it seems obsolete and it seems 
not an actual error assuming from [~jodersky]'s comment above.

If I was wrong and if this is still a problem, please feel free to reopen with 
some reasonable statements

> ArrayTypes incorrect for DataFrames Java API
> 
>
> Key: SPARK-9502
> URL: https://issues.apache.org/jira/browse/SPARK-9502
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1
>Reporter: Kuldeep
>Priority: Critical
>
> With upgrade to 1.4.1 array types for DataFrames were different in our java 
> applications. I have modified JavaApplySchemaSuite to show the problem. 
> Mainly i have added a list field to the person class.
> {code:java}
>   public static class Person implements Serializable {
> private String name;
> private int age;
> private List skills;
> public String getName() {
>   return name;
> }
> public void setName(String name) {
>   this.name = name;
> }
> public int getAge() {
>   return age;
> }
> public void setAge(int age) {
>   this.age = age;
> }
> public void setSkills(List skills) {
>   this.skills = skills;
> }
> public List getSkills() { return skills; }
>   }
>   @Test
>   public void applySchema() {
> List personList = new ArrayList(2);
> List skills = new ArrayList();
> skills.add("eating");
> skills.add("sleeping");
> Person person1 = new Person();
> person1.setName("Michael");
> person1.setAge(29);
> person1.setSkills(skills);
> personList.add(person1);
> Person person2 = new Person();
> person2.setName("Yin");
> person2.setAge(28);
> person2.setSkills(skills);
> personList.add(person2);
> JavaRDD rowRDD = javaCtx.parallelize(personList).map(
>   new Function() {
> public Row call(Person person) throws Exception {
>   return RowFactory.create(person.getName(), person.getAge(), 
> person.getSkills());
> }
>   });
> List fields = new ArrayList(2);
> fields.add(DataTypes.createStructField("name", DataTypes.StringType, 
> false));
> fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, 
> false));
> fields.add(DataTypes.createStructField("skills", 
> DataTypes.createArrayType(DataTypes.StringType), false));
> StructType schema = DataTypes.createStructType(fields);
> DataFrame df = sqlContext.applySchema(rowRDD, schema);
> df.registerTempTable("people");
> Row[] actual = sqlContext.sql("SELECT * FROM people").collect();
>   System.out.println(actual[1].get(2).getClass().getName());
>   System.out.println(actual[1].get(2) instanceof List);
> List expected = new ArrayList(2);
> expected.add(RowFactory.create("Michael", 29, skills));
> expected.add(RowFactory.create("Yin", 28, skills));
> Assert.assertEquals(expected, Arrays.asList(actual));
>   }
> {code}
> This prints 
> scala.collection.immutable.$colon$colon
> false
> java.lang.AssertionError: 
> Expected :[[Michael,29,[eating, sleeping]], [Yin,28,[eating, sleeping]]]
> Actual   :[[Michael,29,List(eating, sleeping)], [Yin,28,List(eating, 
> sleeping)]]
> Not sure if this would be usable even in scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9435) Java UDFs don't work with GROUP BY expressions

2017-01-09 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813832#comment-15813832
 ] 

Hyukjin Kwon commented on SPARK-9435:
-

This sill happens in the current master - 

{code}
val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y")
val udf = new UDF1[Int, Int]  {
  override def call(i: Int): Int = i + 1
}

spark.udf.register("inc", udf, IntegerType)
df.createOrReplaceTempView("tmp")
spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show()
{code}

I tested both Scala and Java ones. and I believe the above one is simpler Scala 
one to reproduce the same issue.

> Java UDFs don't work with GROUP BY expressions
> --
>
> Key: SPARK-9435
> URL: https://issues.apache.org/jira/browse/SPARK-9435
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1
> Environment: All
>Reporter: James Aley
> Attachments: IncMain.java, points.txt
>
>
> If you define a UDF in Java, for example by implementing the UDF1 interface, 
> then try to use that UDF on a column in both the SELECT and GROUP BY clauses 
> of a query, you'll get an error like this:
> {code}
> "SELECT inc(y),COUNT(DISTINCT x) FROM test_table GROUP BY inc(y)"
> org.apache.spark.sql.AnalysisException: expression 'y' is neither present in 
> the group by, nor is it an aggregate function. Add to group by or wrap in 
> first() if you don't care which value you get.
> {code}
> We put together a minimal reproduction in the attached Java file, which makes 
> use of the data in the text file attached.
> I'm guessing there's some kind of issue with the equality implementation, so 
> Spark can't tell that those two expressions are the same maybe? If you do the 
> same thing from Scala, it works fine.
> Note for context: we ran into this issue while working around SPARK-9338.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19143) API in Spark for distributing new delegation tokens (Improve delegation token handling in secure clusters)

2017-01-09 Thread Ruslan Dautkhanov (JIRA)
Ruslan Dautkhanov created SPARK-19143:
-

 Summary: API in Spark for distributing new delegation tokens 
(Improve delegation token handling in secure clusters)
 Key: SPARK-19143
 URL: https://issues.apache.org/jira/browse/SPARK-19143
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, YARN
Affects Versions: 2.1.0, 2.0.2
Reporter: Ruslan Dautkhanov


Spin off from SPARK-14743 and comments chain in [recent comments| 
https://issues.apache.org/jira/browse/SPARK-5493?focusedCommentId=15802179=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15802179]
 in SPARK-5493.

Spark currently doesn't have a way for distribution new delegation tokens. 
Quoting [~vanzin] from SPARK-5493 
{quote}
IIRC Livy doesn't yet support delegation token renewal. Once it reaches the 
TTL, the session is unusable.
There might be ways to hack support for that without changes in Spark, but I'd 
like to see a proper API in Spark for distributing new delegation tokens. I 
mentioned that in SPARK-14743, but although that bug is closed, that particular 
feature hasn't been implemented yet.
{quote}

Other thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813801#comment-15813801
 ] 

Saisai Shao commented on SPARK-19090:
-

I also tested with Spark 1.5.0, I don't see an issue here, the core number is 
still expected as I set:

{noformat}
17/01/10 12:00:31 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/10 12:00:31 INFO yarn.YarnAllocator: Will request 1 executor containers, 
each with 2 cores and 1408 MB memory including 384 MB overhead
17/01/10 12:00:31 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/10 12:00:31 INFO yarn.ApplicationMaster: Started progress reporter thread 
with (heartbeat : 3000, initial allocation : 200) intervals
{noformat}

Can you please tell how do you  run the application?




> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813782#comment-15813782
 ] 

Saisai Shao commented on SPARK-19090:
-

I tested with Spark 2.0 and latest master (2.2.0-SNAPSHOT), the behavior is 
correct, if there's an issue in it, it should be 1.x issue. I'm not sure are we 
still maintaining 1.5 branch to fix the old bug if existed.

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-5511) [SQL] Possible optimisations for predicate pushdowns from Spark SQL to Parquet

2017-01-09 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-5511.
-
Resolution: Invalid

Up to my knowledge, for 1., unless we are going to rewrite the Parquet filter 
evaluation logics in Spark-side, this is a change purely on the Parquet side as 
you said.

For 2., there is an issue about IN filter, in SPARK-17091. 

I am resolving this as {{Invalid}} because I believe 1. is a non-Spark issue 
and 2. is a duplicate (but this JIRA itself is not the exact duplicate). Please 
feel free to reopen this if anyone thinks this action is inappropriate.

> [SQL] Possible optimisations for predicate pushdowns from Spark SQL to Parquet
> --
>
> Key: SPARK-5511
> URL: https://issues.apache.org/jira/browse/SPARK-5511
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.2.0
>Reporter: Mick Davies
>Priority: Minor
>
> The following changes could make predicate pushdown more effective under 
> certain conditions, which are not uncommon.
> 1. Parquet predicate evaluation does not use dictionary compression 
> information, furthermore it circumvents dictionary decoding optimisations 
> (https://issues.apache.org/jira/browse/PARQUET-36). This means predicates are 
> re-evaluated repeatedly for the same Strings, and also Binary->String 
> conversions are repeated. This is a change purely on the Parquet side.
> 2. Support IN clauses in predicate pushdown. This requires changes to Parquet 
> and then subsequently in Spark SQL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813723#comment-15813723
 ] 

nirav patel commented on SPARK-19090:
-

1.5.2 

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11569) StringIndexer transform fails when column contains nulls

2017-01-09 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813712#comment-15813712
 ] 

Joseph K. Bradley commented on SPARK-11569:
---

Hi all, I'm sorry for not following up on this, but I would like us to do this 
at some point.  However, I will insist that we do some research before adding 
an API based on just a few users' requirements.  Have you looked at other 
libraries?
* scikit-learn
* various R libraries
* pandas
* other more specialized but popular ML libraries

> StringIndexer transform fails when column contains nulls
> 
>
> Key: SPARK-11569
> URL: https://issues.apache.org/jira/browse/SPARK-11569
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Maciej Szymkiewicz
>
> Transforming column containing {{null}} values using {{StringIndexer}} 
> results in {{java.lang.NullPointerException}}
> {code}
> from pyspark.ml.feature import StringIndexer
> df = sqlContext.createDataFrame([("a", 1), (None, 2)], ("k", "v"))
> df.printSchema()
> ## root
> ##  |-- k: string (nullable = true)
> ##  |-- v: long (nullable = true)
> indexer = StringIndexer(inputCol="k", outputCol="kIdx")
> indexer.fit(df).transform(df)
> ##  py4j.protocol.Py4JJavaError: An error occurred while calling o75.json.
> ## : java.lang.NullPointerException
> {code}
> Problem disappears when we drop 
> {code}
> df1 = df.na.drop()
> indexer.fit(df1).transform(df1)
> {code}
> or replace {{nulls}}
> {code}
> from pyspark.sql.functions import col, when
> k = col("k")
> df2 = df.withColumn("k", when(k.isNull(), "__NA__").otherwise(k))
> indexer.fit(df2).transform(df2)
> {code}
> and cannot be reproduced using Scala API
> {code}
> import org.apache.spark.ml.feature.StringIndexer
> val df = sc.parallelize(Seq(("a", 1), (null, 2))).toDF("k", "v")
> df.printSchema
> // root
> //  |-- k: string (nullable = true)
> //  |-- v: integer (nullable = false)
> val indexer = new StringIndexer().setInputCol("k").setOutputCol("kIdx")
> indexer.fit(df).transform(df).count
> // 2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813704#comment-15813704
 ] 

Saisai Shao commented on SPARK-19090:
-

Thanks for your elaboration, would you please tell which version of Spark did 
you run the test?

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15034) Use the value of spark.sql.warehouse.dir as the warehouse location instead of using hive.metastore.warehouse.dir

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813703#comment-15813703
 ] 

nirav patel commented on SPARK-15034:
-

Is this documented on spark 2.x documents? I don't see it under configuration.

> Use the value of spark.sql.warehouse.dir as the warehouse location instead of 
> using hive.metastore.warehouse.dir
> 
>
> Key: SPARK-15034
> URL: https://issues.apache.org/jira/browse/SPARK-15034
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Yin Huai
>Assignee: Yin Huai
>  Labels: release_notes, releasenotes
> Fix For: 2.0.0
>
>
> Starting from Spark 2.0, spark.sql.warehouse.dir will be the conf to set 
> warehouse location. We will not use hive.metastore.warehouse.dir.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813689#comment-15813689
 ] 

nirav patel edited comment on SPARK-19090 at 1/10/17 3:15 AM:
--

[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this issue. Following are AM logs from two different runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )

I verified same fact via Spark UI when job is running that with dynamic 
allocation there is only 1 task running per executor.


was (Author: tenstriker):
[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this issue. Following are AM logs from two different runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: 

[jira] [Comment Edited] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813689#comment-15813689
 ] 

nirav patel edited comment on SPARK-19090 at 1/10/17 3:14 AM:
--

[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this issue. Following are AM logs from two different runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )

I can verify same fact via Spark UI when job is running that with dynamic 
allocation there is only 1 task running per executor.


was (Author: tenstriker):
[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this in my opinion. Following are AM logs from two different 
runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: 

[jira] [Commented] (SPARK-19090) Dynamic Resource Allocation not respecting spark.executor.cores

2017-01-09 Thread nirav patel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813689#comment-15813689
 ] 

nirav patel commented on SPARK-19090:
-

[~jerryshao] 

"spark.executor.cores" is to tell spark AM to request no of vcores from Yarn 
per container. I think spark AM makes correct decision when dynamic allocation 
is off but when its on it ignores spark.executor.cores value. I think DRF has 
nothing to do with this in my opinion. Following are AM logs from two different 
runs. 


Run 1:
spark.dynamicAllocation.enabled = true
spark.executor.instances = 6
spark.executor.cores = 5 

Dynamic allocation = true

17/01/09 19:05:49 INFO yarn.YarnRMClient: Registering the ApplicationMaster
17/01/09 19:05:49 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 1 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:05:49 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )


Run 2:

spark.dynamicAllocation.enabled = false
spark.executor.instances = 6
spark.executor.cores = 5 

17/01/09 19:01:39 INFO yarn.YarnAllocator: Will request 6 executor containers, 
each with 5 cores and 11000 MB memory including 1000 MB overhead
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )
17/01/09 19:01:39 INFO yarn.YarnAllocator: Container request (host: Any, 
capability: )

I can verify same fact via Spark UI when job is running that with dynamic 
allocation there is only 1 task running per executor.

> Dynamic Resource Allocation not respecting spark.executor.cores
> ---
>
> Key: SPARK-19090
> URL: https://issues.apache.org/jira/browse/SPARK-19090
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.5.2, 1.6.1, 2.0.1
>Reporter: nirav patel
>
> When enabling dynamic scheduling with yarn I see that all executors are using 
> only 1 core even if I specify "spark.executor.cores" to 6. If dynamic 
> scheduling is disabled then each executors will have 6 cores. i.e. it 
> respects  "spark.executor.cores". I have tested this against spark 1.5 . I 
> think it will be the same behavior with 2.x as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813632#comment-15813632
 ] 

Nan Zhu commented on SPARK-18905:
-

[~zsxwing] If you agree on the conclusion above, I will file a PR

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-19078) hashingTF,ChiSqSelector,IDF,StandardScaler,PCA transform avoid extra vector conversion

2017-01-09 Thread zhengruifeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengruifeng closed SPARK-19078.

Resolution: Duplicate

> hashingTF,ChiSqSelector,IDF,StandardScaler,PCA transform avoid extra vector 
> conversion
> --
>
> Key: SPARK-19078
> URL: https://issues.apache.org/jira/browse/SPARK-19078
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Reporter: zhengruifeng
>Priority: Minor
>
> There some ML.algs (hashingTF,ChiSqSelector,IDF,StandardScaler,PCA,LDA) that 
> use oldModel in MLlib in {{transform}} method, this will cause extra 
> conversion of vectors and matrices.
> This pr modify transform to direct use ml.linalg in 
> {{hashingTF,ChiSqSelector,IDF,StandardScaler,PCA}}.
> I do not touch LDA because it is more complicated than others.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813560#comment-15813560
 ] 

Nan Zhu commented on SPARK-18905:
-

eat my words...

when we have queued up batches, we do need pendingTime, 

and yes, the original description in the JIRA still holds, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18959) invalid resource statistics for standalone cluster

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813510#comment-15813510
 ] 

Apache Spark commented on SPARK-18959:
--

User 'hustfxj' has created a pull request for this issue:
https://github.com/apache/spark/pull/16525

> invalid resource statistics for standalone cluster
> --
>
> Key: SPARK-18959
> URL: https://issues.apache.org/jira/browse/SPARK-18959
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Reporter: hustfxj
>Priority: Minor
> Attachments: 屏幕快照 2016-12-21 11.49.12.png
>
>
> Workers
> Worker Id Address State   Cores   Memory
> worker-20161220162751-10.125.6.222-59295  10.125.6.222:59295  ALIVE   
> 4 (-1 Used) 6.8 GB (-1073741824.0 B Used)
> worker-20161220164233-10.218.135.80-10944 10.218.135.80:10944 ALIVE   
> 4 (0 Used)  6.8 GB (0.0 B Used)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813459#comment-15813459
 ] 

Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:16 AM:
--

yeah, but the downTime includes all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?


was (Author: codingcat):
yeah, but the downTime including all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813459#comment-15813459
 ] 

Nan Zhu commented on SPARK-18905:
-

yeah, but the downTime including all batches from "checkpoint time" to "restart 
time"

the jobs that have been generated but not completed shall be the first batch in 
downTime...no?

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813455#comment-15813455
 ] 

Shixiong Zhu commented on SPARK-18905:
--

[~CodingCat] I think `pendingTime` is the jobs that have been generated but not 
completed.

I think you are right in the JIRA description. The failed job should not be 
removed so that they can be included in the `getPendingTimes`. Could you submit 
a PR to fix it?

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813434#comment-15813434
 ] 

Nan Zhu edited comment on SPARK-18905 at 1/10/17 1:05 AM:
--

Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as counted in the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 


was (Author: codingcat):
Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Nan Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813434#comment-15813434
 ] 

Nan Zhu commented on SPARK-18905:
-

Hi, [~zsxwing]

Thanks for the reply, 

After testing in our environment for more times, I feel that this is not a 
problem anymore. The failed job would be recovered 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L216,
 as the downTime

The question right now is, why we need to have pendingTime + downTime in the 
above method, 

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18905) Potential Issue of Semantics of BatchCompleted

2017-01-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813405#comment-15813405
 ] 

Shixiong Zhu commented on SPARK-18905:
--

Sorry for the late reply. Yeah, good catch. However, even if it doesn't 
complete job, the failed job won't be run after recovery. Right? I think the 
root cause is that a failed job doesn't stop the StreamingContext. This might 
be a huge behavior change.

cc [~tdas]

> Potential Issue of Semantics of BatchCompleted
> --
>
> Key: SPARK-18905
> URL: https://issues.apache.org/jira/browse/SPARK-18905
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0, 2.0.1, 2.0.2
>Reporter: Nan Zhu
>
> the current implementation of Spark streaming considers a batch is completed 
> no matter the results of the jobs 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203)
> Let's consider the following case:
> A micro batch contains 2 jobs and they read from two different kafka topics 
> respectively. One of these jobs is failed due to some problem in the user 
> defined logic, after the other one is finished successfully. 
> 1. The main thread in the Spark streaming application will execute the line 
> mentioned above, 
> 2. and another thread (checkpoint writer) will make a checkpoint file 
> immediately after this line is executed. 
> 3. Then due to the current error handling mechanism in Spark Streaming, 
> StreamingContext will be closed 
> (https://github.com/apache/spark/blob/1169db44bc1d51e68feb6ba2552520b2d660c2c0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L214)
> the user recovers from the checkpoint file, and because the JobSet containing 
> the failed job has been removed (taken as completed) before the checkpoint is 
> constructed, the data being processed by the failed job would never be 
> reprocessed?
> I might have missed something in the checkpoint thread or this 
> handleJobCompletion()or it is a potential bug 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19110) DistributedLDAModel returns different logPrior for original and loaded model

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813315#comment-15813315
 ] 

Apache Spark commented on SPARK-19110:
--

User 'wangmiao1981' has created a pull request for this issue:
https://github.com/apache/spark/pull/16524

> DistributedLDAModel returns different logPrior for original and loaded model
> 
>
> Key: SPARK-19110
> URL: https://issues.apache.org/jira/browse/SPARK-19110
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.3, 2.0.2, 2.1.0, 2.2.0
>Reporter: Miao Wang
>Assignee: Miao Wang
> Fix For: 2.0.3, 2.1.1, 2.2.0
>
>
> While adding DistributedLDAModel training summary for SparkR, I found that 
> the logPrior for original and loaded model is different.
> For example, in the test("read/write DistributedLDAModel"), I add the test:
> val logPrior = model.asInstanceOf[DistributedLDAModel].logPrior
>   val logPrior2 = model2.asInstanceOf[DistributedLDAModel].logPrior
>   assert(logPrior === logPrior2)
> The test fails:
> -4.394180878889078 did not equal -4.294290536919573



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19142) spark.kmeans should take seed, initSteps, and tol as parameters

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19142:


Assignee: (was: Apache Spark)

> spark.kmeans should take seed, initSteps, and tol as parameters
> ---
>
> Key: SPARK-19142
> URL: https://issues.apache.org/jira/browse/SPARK-19142
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Miao Wang
>
> spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark 
> Kmeans algorithm doesn't take the same set of parameters as R kmeans, we 
> should maintain a different interface in spark.kmeans.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19142) spark.kmeans should take seed, initSteps, and tol as parameters

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19142:


Assignee: Apache Spark

> spark.kmeans should take seed, initSteps, and tol as parameters
> ---
>
> Key: SPARK-19142
> URL: https://issues.apache.org/jira/browse/SPARK-19142
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Miao Wang
>Assignee: Apache Spark
>
> spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark 
> Kmeans algorithm doesn't take the same set of parameters as R kmeans, we 
> should maintain a different interface in spark.kmeans.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19142) spark.kmeans should take seed, initSteps, and tol as parameters

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813258#comment-15813258
 ] 

Apache Spark commented on SPARK-19142:
--

User 'wangmiao1981' has created a pull request for this issue:
https://github.com/apache/spark/pull/16523

> spark.kmeans should take seed, initSteps, and tol as parameters
> ---
>
> Key: SPARK-19142
> URL: https://issues.apache.org/jira/browse/SPARK-19142
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Miao Wang
>
> spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark 
> Kmeans algorithm doesn't take the same set of parameters as R kmeans, we 
> should maintain a different interface in spark.kmeans.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19142) spark.kmeans should take seed, initSteps, and tol as parameters

2017-01-09 Thread Miao Wang (JIRA)
Miao Wang created SPARK-19142:
-

 Summary: spark.kmeans should take seed, initSteps, and tol as 
parameters
 Key: SPARK-19142
 URL: https://issues.apache.org/jira/browse/SPARK-19142
 Project: Spark
  Issue Type: Bug
  Components: SparkR
Reporter: Miao Wang


spark.kmeans doesn't have interface to set initSteps, seed and tol. As Spark 
Kmeans algorithm doesn't take the same set of parameters as R kmeans, we should 
maintain a different interface in spark.kmeans.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Dongjoon Hyun (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-19137:
--
Component/s: Structured Streaming

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19137:


Assignee: Apache Spark

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Assignee: Apache Spark
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19137:


Assignee: (was: Apache Spark)

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813230#comment-15813230
 ] 

Apache Spark commented on SPARK-19137:
--

User 'dongjoon-hyun' has created a pull request for this issue:
https://github.com/apache/spark/pull/16522

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe

2017-01-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813205#comment-15813205
 ] 

Shixiong Zhu commented on SPARK-17463:
--

[~sunil.rangwani] could you have a simple reproducer? I ran the following codes 
on both 2.0.2 and the latest master and could not reproduce it.
{code}
import org.apache.spark.util.CollectionAccumulator

val updatedRecordKeysAcc = spark.sparkContext.collectionAccumulator[String]

def processRecord(recordJSON: String, updatedRecordKeysAcc: 
CollectionAccumulator[String]) {
updatedRecordKeysAcc.add((recordJSON.toInt percent 10).toString)
}

for (_ <- 0 until 100) {
  spark.range(1, 1000).map(_.toString).foreach(processRecord(_, 
updatedRecordKeysAcc))
}
{code}
This may be an EMR Spark only issue.

> Serialization of accumulators in heartbeats is not thread-safe
> --
>
> Key: SPARK-17463
> URL: https://issues.apache.org/jira/browse/SPARK-17463
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0
>Reporter: Josh Rosen
>Assignee: Shixiong Zhu
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = 
> Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2, HOST, 57743))] in 1 
> attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
> at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
> at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
> at 
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:766)
> at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> 

[jira] [Commented] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813192#comment-15813192
 ] 

Dongjoon Hyun commented on SPARK-19137:
---

Hi, [~vanzin].
I'll make a PR for this.

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18866) Codegen fails with cryptic error if regexp_replace() output column is not aliased

2017-01-09 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen resolved SPARK-18866.

   Resolution: Duplicate
Fix Version/s: 2.2.0
   2.1.1

> Codegen fails with cryptic error if regexp_replace() output column is not 
> aliased
> -
>
> Key: SPARK-18866
> URL: https://issues.apache.org/jira/browse/SPARK-18866
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.2, 2.1.0
> Environment: Java 8, Python 3.5
>Reporter: Nicholas Chammas
>Priority: Minor
> Fix For: 2.1.1, 2.2.0
>
>
> Here's a minimal repro:
> {code}
> import pyspark
> from pyspark.sql import Column
> from pyspark.sql.functions import regexp_replace, lower, col
> def normalize_udf(column: Column) -> Column:
> normalized_column = (
> regexp_replace(
> column,
> pattern='[\s]+',
> replacement=' ',
> )
> )
> return normalized_column
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> raw_df = spark.createDataFrame(
> [('  ',)],
> ['string'],
> )
> normalized_df = raw_df.select(normalize_udf('string'))
> normalized_df_prime = (
> normalized_df
> .groupBy(sorted(normalized_df.columns))
> .count())
> normalized_df_prime.show()
> {code}
> When I run this I get:
> {code}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 80, Column 130: Invalid escape sequence
> {code}
> Followed by a huge barf of generated Java code, _and then the output I 
> expect_. (So despite the scary error, the code actually works!)
> Can you spot the error in my code?
> It's simple: I just need to alias the output of {{normalize_udf()}} and all 
> is forgiven:
> {code}
> normalized_df = raw_df.select(normalize_udf('string').alias('string'))
> {code}
> Of course, it's impossible to tell that from the current error output. So my 
> *first question* is: Is there some way we can better communicate to the user 
> what went wrong?
> Another interesting thing I noticed is that if I try this:
> {code}
> normalized_df = raw_df.select(lower('string'))
> {code}
> I immediately get a clean error saying:
> {code}
> py4j.protocol.Py4JError: An error occurred while calling 
> z:org.apache.spark.sql.functions.lower. Trace:
> py4j.Py4JException: Method lower([class java.lang.String]) does not exist
> {code}
> I can fix this by building a column object:
> {code}
> normalized_df = raw_df.select(lower(col('string')))
> {code}
> So that raises *a second problem/question*: Why does {{lower()}} require that 
> I build a Column object, whereas {{regexp_replace()}} does not? The 
> inconsistency adds to the confusion here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18866) Codegen fails with cryptic error if regexp_replace() output column is not aliased

2017-01-09 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813172#comment-15813172
 ] 

Josh Rosen commented on SPARK-18866:


Yep, that's it. This should be fixed by Burak's patch.

> Codegen fails with cryptic error if regexp_replace() output column is not 
> aliased
> -
>
> Key: SPARK-18866
> URL: https://issues.apache.org/jira/browse/SPARK-18866
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.0.2, 2.1.0
> Environment: Java 8, Python 3.5
>Reporter: Nicholas Chammas
>Priority: Minor
>
> Here's a minimal repro:
> {code}
> import pyspark
> from pyspark.sql import Column
> from pyspark.sql.functions import regexp_replace, lower, col
> def normalize_udf(column: Column) -> Column:
> normalized_column = (
> regexp_replace(
> column,
> pattern='[\s]+',
> replacement=' ',
> )
> )
> return normalized_column
> if __name__ == '__main__':
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> raw_df = spark.createDataFrame(
> [('  ',)],
> ['string'],
> )
> normalized_df = raw_df.select(normalize_udf('string'))
> normalized_df_prime = (
> normalized_df
> .groupBy(sorted(normalized_df.columns))
> .count())
> normalized_df_prime.show()
> {code}
> When I run this I get:
> {code}
> ERROR CodeGenerator: failed to compile: 
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 80, Column 130: Invalid escape sequence
> {code}
> Followed by a huge barf of generated Java code, _and then the output I 
> expect_. (So despite the scary error, the code actually works!)
> Can you spot the error in my code?
> It's simple: I just need to alias the output of {{normalize_udf()}} and all 
> is forgiven:
> {code}
> normalized_df = raw_df.select(normalize_udf('string').alias('string'))
> {code}
> Of course, it's impossible to tell that from the current error output. So my 
> *first question* is: Is there some way we can better communicate to the user 
> what went wrong?
> Another interesting thing I noticed is that if I try this:
> {code}
> normalized_df = raw_df.select(lower('string'))
> {code}
> I immediately get a clean error saying:
> {code}
> py4j.protocol.Py4JError: An error occurred while calling 
> z:org.apache.spark.sql.functions.lower. Trace:
> py4j.Py4JException: Method lower([class java.lang.String]) does not exist
> {code}
> I can fix this by building a column object:
> {code}
> normalized_df = raw_df.select(lower(col('string')))
> {code}
> So that raises *a second problem/question*: Why does {{lower()}} require that 
> I build a Column object, whereas {{regexp_replace()}} does not? The 
> inconsistency adds to the confusion here.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18952) regex strings not properly escaped in codegen for aggregations

2017-01-09 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen resolved SPARK-18952.

   Resolution: Fixed
 Assignee: Burak Yavuz
Fix Version/s: 2.2.0
   2.1.1

Fixed by Burak's patch for 2.1.1 / 2.2.0.

> regex strings not properly escaped in codegen for aggregations
> --
>
> Key: SPARK-18952
> URL: https://issues.apache.org/jira/browse/SPARK-18952
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Burak Yavuz
>Assignee: Burak Yavuz
> Fix For: 2.1.1, 2.2.0
>
>
> If I use the function regexp_extract, and then in my regex string, use `\`, 
> i.e. escape character, this fails codegen, because the `\` character is not 
> properly escaped when codegen'd.
> Example stack trace:
> {code}
> /* 059 */ private int maxSteps = 2;
> /* 060 */ private int numRows = 0;
> /* 061 */ private org.apache.spark.sql.types.StructType keySchema = new 
> org.apache.spark.sql.types.StructType().add("date_format(window#325.start, 
> -MM-dd HH:mm)", org.apache.spark.sql.types.DataTypes.StringType)
> /* 062 */ .add("regexp_extract(source#310.description, ([a-zA-Z]+)\[.*, 
> 1)", org.apache.spark.sql.types.DataTypes.StringType);
> /* 063 */ private org.apache.spark.sql.types.StructType valueSchema = new 
> org.apache.spark.sql.types.StructType().add("sum", 
> org.apache.spark.sql.types.DataTypes.LongType);
> /* 064 */ private Object emptyVBase;
> ...
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 62, Column 58: Invalid escape sequence
>   at org.codehaus.janino.Scanner.scanLiteralCharacter(Scanner.java:918)
>   at org.codehaus.janino.Scanner.produce(Scanner.java:604)
>   at org.codehaus.janino.Parser.peekRead(Parser.java:3239)
>   at org.codehaus.janino.Parser.parseArguments(Parser.java:3055)
>   at org.codehaus.janino.Parser.parseSelector(Parser.java:2914)
>   at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2617)
>   at 
> org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2573)
>   at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2552)
> {code}
> In the codegend expression, the literal should use `\\` instead of `\`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19138) Python: new HiveContext will use a stopped SparkContext

2017-01-09 Thread Ryan Blue (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Blue resolved SPARK-19138.
---
Resolution: Duplicate

> Python: new HiveContext will use a stopped SparkContext
> ---
>
> Key: SPARK-19138
> URL: https://issues.apache.org/jira/browse/SPARK-19138
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Reporter: Ryan Blue
>
> We have users that run a notebook cell that creates a new SparkContext to 
> overwrite some of the default initial parameters:
> {code:lang=python}
> if 'sc' in globals():
> #Stop the running SparkContext if there is one running.
> sc.stop()
> conf = SparkConf().setAppName("app")
> #conf.set('spark.sql.shuffle.partitions', '2000')
> sc = SparkContext(conf=conf)
> sqlContext = HiveContext(sc)
> {code}
> In Spark 2.0, this creates an invalid SQLContext that uses the original 
> SparkContext because the [HiveContext 
> contstructor|https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L514]
>  uses SparkSession.getOrCreate that has the old SparkContext. A SparkSession 
> should be invalidated and no longer returned by getOrCreate if its 
> SparkContext has been stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19141) VectorAssembler metadata causing memory issues

2017-01-09 Thread Antonia Oprescu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antonia Oprescu updated SPARK-19141:

Description: 
VectorAssembler produces unnecessary metadata that overflows the Java heap in 
the case of sparse vectors. In the example below, the logical length of the 
vector is 10^6, but the number of non-zero values is only 2.
The problem arises when the vector assembler creates metadata (ML attributes) 
for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. 
HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it 
produces:

{noformat}
{"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":100,"name":"Feat01"}]},"num_attrs":101}}
{noformat}

In this lightweight example, the feature size limit seems to be 1,000,000 when 
run locally, but this scales poorly with more complicated routines. With a 
larger dataset and a learner (say LogisticRegression), it maxes out anywhere 
between 10k and 100k hash size even on a decent sized cluster.
I did some digging, and it seems that the only metadata necessary for 
downstream learners is the one indicating categorical columns. Thus, I thought 
of the following possible solutions:

1. Compact representation of ml attributes metadata (but this seems to be a 
bigger change)
2. Removal of non-categorical tags from the metadata created by the 
VectorAssembler
3. An option on the existent VectorAssembler to skip unnecessary ml attributes 
or create another transformer altogether

I would happy to take a stab at any of these solutions, but I need some 
direction from the Spark community.

{code:title=VABug.scala |borderStyle=solid}
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, VectorAssembler}
import org.apache.spark.sql.SparkSession


object VARepro {

  case class Record(Label: Double, Feat01: Double, Feat02: Array[String])

  def main(args: Array[String]) {

val conf = new SparkConf()
  .setAppName("Vector assembler bug")
  .setMaster("local[*]")
val spark = SparkSession.builder.config(conf).getOrCreate()

import spark.implicits._
val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, 
Array("a9ee"))).toDS()

val numFeatures = 1000
val hashingScheme = new 
HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures)
val hashedData = hashingScheme.transform(df)

val vectorAssembler = new 
VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features")
val processedData = vectorAssembler.transform(hashedData).select("Label", 
"Features")
processedData.show()
  }
}
{code}

*Stacktrace from the example above:*

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit 
exceeded
at 
org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
at 
org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
at 
org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
at 
scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at 
scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at 
scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at 
scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at scala.Option.map(Option.scala:146)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:70)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:65)
at 
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
at VARepro$.main(VARepro.scala:36)


*Exception when run in conjuction with a learner on a bigger dataset (~10Gb) on 
a cluster.*

: java.lang.OutOfMemoryError: Java heap space
at 

[jira] [Created] (SPARK-19141) VectorAssembler metadata causing memory issues

2017-01-09 Thread Antonia Oprescu (JIRA)
Antonia Oprescu created SPARK-19141:
---

 Summary: VectorAssembler metadata causing memory issues
 Key: SPARK-19141
 URL: https://issues.apache.org/jira/browse/SPARK-19141
 Project: Spark
  Issue Type: Bug
  Components: ML, MLlib
Affects Versions: 2.1.0, 2.0.0, 1.6.0
 Environment: Windows 10, Ubuntu 16.04.1, Scala 2.11.8, Spark 1.6.0, 
2.0.0, 2.1.0
Reporter: Antonia Oprescu


VectorAssembler produces unnecessary metadata that overflows the Java heap in 
the case of sparse vectors. In the example below, the logical length of the 
vector is 10^6, but the number of non-zero values is only 2.
The problem arises when the vector assembler creates metadata (ML attributes) 
for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. 
HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it 
produces:

{noformat}
{"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":100,"name":"Feat01"}]},"num_attrs":101}}
{noformat}

In this lightweight example, the feature size limit seems to be 1,000,000 when 
run locally, but this scales poorly with more complicated routines. With a 
larger dataset and a learner (say LogisticRegression), it maxes out anywhere 
between 10k and 100k hash size even on a decent sized cluster.
I did some digging, and it seems that the only metadata necessary for 
downstream learners is the one indicating categorical columns. Thus, I thought 
of the following possible solutions:

1. Compact representation of ml attributes metadata (but this seems to be a 
bigger change)
2. Removal of non-categorical tags from the metadata created by the 
VectorAssembler
3. An option on the existent VectorAssembler to skip unnecessary ml attributes 
or create another transformer altogether

I would happy to take a stab at any of these solutions, but I need some 
direction from the Spark community.

{code:title=VABug.scala |borderStyle=solid}
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, VectorAssembler}
import org.apache.spark.sql.SparkSession


object VARepro {

  case class Record(Label: Double, Feat01: Double, Feat02: Array[String])

  def main(args: Array[String]) {

val conf = new SparkConf()
  .setAppName("Vector assembler bug")
  .setMaster("local[*]")
val spark = SparkSession.builder.config(conf).getOrCreate()

import spark.implicits._
val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, 
Array("a9ee"))).toDS()

val numFeatures = 1000
val hashingScheme = new 
HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures)
val hashedData = hashingScheme.transform(df)

val vectorAssembler = new 
VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features")
val processedData = vectorAssembler.transform(hashedData).select("Label", 
"Features")
processedData.show()
  }
}
{code}

Stacktrace from the example above:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit 
exceeded
at 
org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
at 
org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
at 
org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
at 
scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at 
scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at 
scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at 
scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at scala.Option.map(Option.scala:146)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:70)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:65)
 

[jira] [Assigned] (SPARK-19138) Python: new HiveContext will use a stopped SparkContext

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19138:


Assignee: (was: Apache Spark)

> Python: new HiveContext will use a stopped SparkContext
> ---
>
> Key: SPARK-19138
> URL: https://issues.apache.org/jira/browse/SPARK-19138
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Reporter: Ryan Blue
>
> We have users that run a notebook cell that creates a new SparkContext to 
> overwrite some of the default initial parameters:
> {code:lang=python}
> if 'sc' in globals():
> #Stop the running SparkContext if there is one running.
> sc.stop()
> conf = SparkConf().setAppName("app")
> #conf.set('spark.sql.shuffle.partitions', '2000')
> sc = SparkContext(conf=conf)
> sqlContext = HiveContext(sc)
> {code}
> In Spark 2.0, this creates an invalid SQLContext that uses the original 
> SparkContext because the [HiveContext 
> contstructor|https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L514]
>  uses SparkSession.getOrCreate that has the old SparkContext. A SparkSession 
> should be invalidated and no longer returned by getOrCreate if its 
> SparkContext has been stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19139) AES-based authentication mechanism for Spark

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19139:


Assignee: Apache Spark

> AES-based authentication mechanism for Spark
> 
>
> Key: SPARK-19139
> URL: https://issues.apache.org/jira/browse/SPARK-19139
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Assignee: Apache Spark
>
> In SPARK-13331, support for AES encryption was added to the Spark network 
> library. But the authentication of different Spark processes is still 
> performed using SASL's DIGEST-MD5 mechanism. That means the authentication 
> part is the weakest link; since the AES keys are currently encrypted using 
> 3des (strongest cipher supported by SASL), Spark can't really claim to 
> provide the full benefits of using AES for encryption.
> We should add a new auth protocol that doesn't need these disclaimers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19138) Python: new HiveContext will use a stopped SparkContext

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19138:


Assignee: Apache Spark

> Python: new HiveContext will use a stopped SparkContext
> ---
>
> Key: SPARK-19138
> URL: https://issues.apache.org/jira/browse/SPARK-19138
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Reporter: Ryan Blue
>Assignee: Apache Spark
>
> We have users that run a notebook cell that creates a new SparkContext to 
> overwrite some of the default initial parameters:
> {code:lang=python}
> if 'sc' in globals():
> #Stop the running SparkContext if there is one running.
> sc.stop()
> conf = SparkConf().setAppName("app")
> #conf.set('spark.sql.shuffle.partitions', '2000')
> sc = SparkContext(conf=conf)
> sqlContext = HiveContext(sc)
> {code}
> In Spark 2.0, this creates an invalid SQLContext that uses the original 
> SparkContext because the [HiveContext 
> contstructor|https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L514]
>  uses SparkSession.getOrCreate that has the old SparkContext. A SparkSession 
> should be invalidated and no longer returned by getOrCreate if its 
> SparkContext has been stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19139) AES-based authentication mechanism for Spark

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19139:


Assignee: (was: Apache Spark)

> AES-based authentication mechanism for Spark
> 
>
> Key: SPARK-19139
> URL: https://issues.apache.org/jira/browse/SPARK-19139
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>
> In SPARK-13331, support for AES encryption was added to the Spark network 
> library. But the authentication of different Spark processes is still 
> performed using SASL's DIGEST-MD5 mechanism. That means the authentication 
> part is the weakest link; since the AES keys are currently encrypted using 
> 3des (strongest cipher supported by SASL), Spark can't really claim to 
> provide the full benefits of using AES for encryption.
> We should add a new auth protocol that doesn't need these disclaimers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19139) AES-based authentication mechanism for Spark

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812988#comment-15812988
 ] 

Apache Spark commented on SPARK-19139:
--

User 'vanzin' has created a pull request for this issue:
https://github.com/apache/spark/pull/16521

> AES-based authentication mechanism for Spark
> 
>
> Key: SPARK-19139
> URL: https://issues.apache.org/jira/browse/SPARK-19139
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>
> In SPARK-13331, support for AES encryption was added to the Spark network 
> library. But the authentication of different Spark processes is still 
> performed using SASL's DIGEST-MD5 mechanism. That means the authentication 
> part is the weakest link; since the AES keys are currently encrypted using 
> 3des (strongest cipher supported by SASL), Spark can't really claim to 
> provide the full benefits of using AES for encryption.
> We should add a new auth protocol that doesn't need these disclaimers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19140) Allow update mode for non-aggregation streaming queries

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19140:


Assignee: Apache Spark  (was: Shixiong Zhu)

> Allow update mode for non-aggregation streaming queries
> ---
>
> Key: SPARK-19140
> URL: https://issues.apache.org/jira/browse/SPARK-19140
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Shixiong Zhu
>Assignee: Apache Spark
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19140) Allow update mode for non-aggregation streaming queries

2017-01-09 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-19140:


Assignee: Shixiong Zhu  (was: Apache Spark)

> Allow update mode for non-aggregation streaming queries
> ---
>
> Key: SPARK-19140
> URL: https://issues.apache.org/jira/browse/SPARK-19140
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19140) Allow update mode for non-aggregation streaming queries

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812987#comment-15812987
 ] 

Apache Spark commented on SPARK-19140:
--

User 'zsxwing' has created a pull request for this issue:
https://github.com/apache/spark/pull/16520

> Allow update mode for non-aggregation streaming queries
> ---
>
> Key: SPARK-19140
> URL: https://issues.apache.org/jira/browse/SPARK-19140
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19138) Python: new HiveContext will use a stopped SparkContext

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812984#comment-15812984
 ] 

Apache Spark commented on SPARK-19138:
--

User 'rdblue' has created a pull request for this issue:
https://github.com/apache/spark/pull/16519

> Python: new HiveContext will use a stopped SparkContext
> ---
>
> Key: SPARK-19138
> URL: https://issues.apache.org/jira/browse/SPARK-19138
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Reporter: Ryan Blue
>
> We have users that run a notebook cell that creates a new SparkContext to 
> overwrite some of the default initial parameters:
> {code:lang=python}
> if 'sc' in globals():
> #Stop the running SparkContext if there is one running.
> sc.stop()
> conf = SparkConf().setAppName("app")
> #conf.set('spark.sql.shuffle.partitions', '2000')
> sc = SparkContext(conf=conf)
> sqlContext = HiveContext(sc)
> {code}
> In Spark 2.0, this creates an invalid SQLContext that uses the original 
> SparkContext because the [HiveContext 
> contstructor|https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L514]
>  uses SparkSession.getOrCreate that has the old SparkContext. A SparkSession 
> should be invalidated and no longer returned by getOrCreate if its 
> SparkContext has been stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19140) Allow update mode for non-aggregation streaming queries

2017-01-09 Thread Shixiong Zhu (JIRA)
Shixiong Zhu created SPARK-19140:


 Summary: Allow update mode for non-aggregation streaming queries
 Key: SPARK-19140
 URL: https://issues.apache.org/jira/browse/SPARK-19140
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Reporter: Shixiong Zhu
Assignee: Shixiong Zhu






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19139) AES-based authentication mechanism for Spark

2017-01-09 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-19139:
--

 Summary: AES-based authentication mechanism for Spark
 Key: SPARK-19139
 URL: https://issues.apache.org/jira/browse/SPARK-19139
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Marcelo Vanzin


In SPARK-13331, support for AES encryption was added to the Spark network 
library. But the authentication of different Spark processes is still performed 
using SASL's DIGEST-MD5 mechanism. That means the authentication part is the 
weakest link; since the AES keys are currently encrypted using 3des (strongest 
cipher supported by SASL), Spark can't really claim to provide the full 
benefits of using AES for encryption.

We should add a new auth protocol that doesn't need these disclaimers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812915#comment-15812915
 ] 

Dongjoon Hyun commented on SPARK-19137:
---

+1

> Garbage left in source tree after SQL tests are run
> ---
>
> Key: SPARK-19137
> URL: https://issues.apache.org/jira/browse/SPARK-19137
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Minor
>
> Don't know when this started, but after I run tests in sbt I'm left with 
> garbage files in my source repo:
> {noformat}
> Untracked files:
>   (use "git add ..." to include in what will be committed)
> sql/core/%253Cundefined%253E/
> sql/core/%3Cundefined%3E/
> {noformat}
> Tests should always write things under the "target" directory so that it gets 
> automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19138) Python: new HiveContext will use a stopped SparkContext

2017-01-09 Thread Ryan Blue (JIRA)
Ryan Blue created SPARK-19138:
-

 Summary: Python: new HiveContext will use a stopped SparkContext
 Key: SPARK-19138
 URL: https://issues.apache.org/jira/browse/SPARK-19138
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Reporter: Ryan Blue


We have users that run a notebook cell that creates a new SparkContext to 
overwrite some of the default initial parameters:

{code:lang=python}
if 'sc' in globals():
#Stop the running SparkContext if there is one running.
sc.stop()

conf = SparkConf().setAppName("app")
#conf.set('spark.sql.shuffle.partitions', '2000')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
{code}

In Spark 2.0, this creates an invalid SQLContext that uses the original 
SparkContext because the [HiveContext 
contstructor|https://github.com/apache/spark/blob/master/python/pyspark/sql/context.py#L514]
 uses SparkSession.getOrCreate that has the old SparkContext. A SparkSession 
should be invalidated and no longer returned by getOrCreate if its SparkContext 
has been stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19137) Garbage left in source tree after SQL tests are run

2017-01-09 Thread Marcelo Vanzin (JIRA)
Marcelo Vanzin created SPARK-19137:
--

 Summary: Garbage left in source tree after SQL tests are run
 Key: SPARK-19137
 URL: https://issues.apache.org/jira/browse/SPARK-19137
 Project: Spark
  Issue Type: Bug
  Components: SQL, Tests
Affects Versions: 2.2.0
Reporter: Marcelo Vanzin
Priority: Minor


Don't know when this started, but after I run tests in sbt I'm left with 
garbage files in my source repo:

{noformat}
Untracked files:
  (use "git add ..." to include in what will be committed)

sql/core/%253Cundefined%253E/
sql/core/%3Cundefined%3E/
{noformat}

Tests should always write things under the "target" directory so that it gets 
automatically ignored by git and cleaned up by "sbt/mvn clean."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18113) Sending AskPermissionToCommitOutput failed, driver enter into task deadloop

2017-01-09 Thread Andrew Ash (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812819#comment-15812819
 ] 

Andrew Ash commented on SPARK-18113:


I've done some more diagnosis on an example I saw, and think there's a failure 
mode that https://issues.apache.org/jira/browse/SPARK-8029 didn't consider.  
Here's the sequence of steps I think causes this issue:

- executor requests a task commit
- coordinator approves commit and sends response message
- executor is preempted by YARN!
- response message goes nowhere
- OCC has attempt=0 fixed for that stage/partition now so no other attempt will 
succeed :(

Are you running in YARN with preemption enabled? Is there preemption activity 
around the time of the task deadloop?

> Sending AskPermissionToCommitOutput failed, driver enter into task deadloop
> ---
>
> Key: SPARK-18113
> URL: https://issues.apache.org/jira/browse/SPARK-18113
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.1
> Environment: # cat /etc/redhat-release 
> Red Hat Enterprise Linux Server release 7.2 (Maipo)
>Reporter: xuqing
>
> Executor sends *AskPermissionToCommitOutput* to driver failed, and retry 
> another sending. Driver receives 2 AskPermissionToCommitOutput messages and 
> handles them. But executor ignores the first response(true) and receives the 
> second response(false). The TaskAttemptNumber for this partition in 
> authorizedCommittersByStage is locked forever. Driver enters into infinite 
> loop.
> h4. Driver Log:
> {noformat}
> 16/10/25 05:38:28 INFO TaskSetManager: Starting task 24.0 in stage 2.0 (TID 
> 110, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/25 05:39:00 WARN TaskSetManager: Lost task 24.0 in stage 2.0 (TID 110, 
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2, 
> partition: 24, attemptNumber: 0
> ...
> 16/10/25 05:39:00 INFO OutputCommitCoordinator: Task was denied committing, 
> stage: 2, partition: 24, attempt: 0
> ...
> 16/10/26 15:53:03 INFO TaskSetManager: Starting task 24.1 in stage 2.0 (TID 
> 119, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/26 15:53:05 WARN TaskSetManager: Lost task 24.1 in stage 2.0 (TID 119, 
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2, 
> partition: 24, attemptNumber: 1
> 16/10/26 15:53:05 INFO OutputCommitCoordinator: Task was denied committing, 
> stage: 2, partition: 24, attempt: 1
> ...
> 16/10/26 15:53:05 INFO TaskSetManager: Starting task 24.28654 in stage 2.0 
> (TID 28733, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> {noformat}
> h4. Executor Log:
> {noformat}
> ...
> 16/10/25 05:38:42 INFO Executor: Running task 24.0 in stage 2.0 (TID 110)
> ...
> 16/10/25 05:39:10 WARN NettyRpcEndpointRef: Error sending message [message = 
> AskPermissionToCommitOutput(2,24,0)] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at 
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at 
> org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:95)
> at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:73)
> at 
> org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
> at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1212)
> at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:279)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.lang.Thread.run(Thread.java:785)
> Caused by: 

[jira] [Commented] (SPARK-3877) The exit code of spark-submit is still 0 when an yarn application fails

2017-01-09 Thread Joshua Caplan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812752#comment-15812752
 ] 

Joshua Caplan commented on SPARK-3877:
--

I think you have created a race condition with this fix which I am encountering 
about 50% of the time, using Spark 1.6.3.  I have configured YARN not to keep 
*any* recent jobs in memory, as some of my jobs get pretty large.

yarn-site   yarn.resourcemanager.max-completed-applications 0

The once-per-second call to getApplicationReport may thus encounter a RUNNING 
application followed by a not found application, and report a false negative.

(typical) Executor log:
17/01/09 19:31:23 INFO ApplicationMaster: Final app status: SUCCEEDED, 
exitCode: 0
17/01/09 19:31:23 INFO SparkContext: Invoking stop() from shutdown hook
17/01/09 19:31:24 INFO SparkUI: Stopped Spark web UI at http://10.0.0.168:37046
17/01/09 19:31:24 INFO YarnClusterSchedulerBackend: Shutting down all executors
17/01/09 19:31:24 INFO YarnClusterSchedulerBackend: Asking each executor to 
shut down
17/01/09 19:31:24 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
17/01/09 19:31:24 INFO MemoryStore: MemoryStore cleared
17/01/09 19:31:24 INFO BlockManager: BlockManager stopped
17/01/09 19:31:24 INFO BlockManagerMaster: BlockManagerMaster stopped
17/01/09 19:31:24 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
17/01/09 19:31:24 INFO SparkContext: Successfully stopped SparkContext
17/01/09 19:31:24 INFO ApplicationMaster: Unregistering ApplicationMaster with 
SUCCEEDED
17/01/09 19:31:24 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
17/01/09 19:31:24 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
17/01/09 19:31:24 INFO AMRMClientImpl: Waiting for application to be 
successfully unregistered.
17/01/09 19:31:24 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.

Client log:
17/01/09 19:31:23 INFO Client: Application report for 
application_1483983939941_0056 (state: RUNNING)
17/01/09 19:31:24 ERROR Client: Application application_1483983939941_0056 not 
found.
Exception in thread "main" org.apache.spark.SparkException: Application 
application_1483983939941_0056 is killed
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1038)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1081)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


> The exit code of spark-submit is still 0 when an yarn application fails
> ---
>
> Key: SPARK-3877
> URL: https://issues.apache.org/jira/browse/SPARK-3877
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.1.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>  Labels: yarn
> Fix For: 1.1.1, 1.2.0
>
>
> When an yarn application fails (yarn-cluster mode), the exit code of 
> spark-submit is still 0. It's hard for people to write some automatic scripts 
> to run spark jobs in yarn because the failure can not be detected in these 
> scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19123) KeyProviderException when reading Azure Blobs from Apache Spark

2017-01-09 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-19123:
--
Target Version/s:   (was: 2.0.0)
  Labels:   (was: newbie)

> KeyProviderException when reading Azure Blobs from Apache Spark
> ---
>
> Key: SPARK-19123
> URL: https://issues.apache.org/jira/browse/SPARK-19123
> Project: Spark
>  Issue Type: Question
>  Components: Input/Output, Java API
>Affects Versions: 2.0.0
> Environment: Apache Spark 2.0.0 running on Azure HDInsight cluster 
> version 3.5 with Hadoop version 2.7.3
>Reporter: Saulo Ricci
>Priority: Minor
>
> I created a Spark job and it's intended to read a set of json files from a 
> Azure Blob container. I set the key and reference to my storage and I'm 
> reading the files as showed in the snippet bellow:
> {code:java}
> SparkSession
> sparkSession =
> SparkSession.builder().appName("Pipeline")
> .master("yarn")
> .config("fs.azure", 
> "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
> 
> .config("fs.azure.account.key..blob.core.windows.net","")
> .getOrCreate();
> Dataset txs = sparkSession.read().json("wasb://path_to_files");
> {code}
> The point is that I'm unfortunately getting a 
> `org.apache.hadoop.fs.azure.KeyProviderException` when reading the blobs from 
> the azure storage. According to the trace showed bellow it seems the header 
> too long but still trying to figure out what exactly that means:
> {code:java}
> 17/01/07 19:28:39 ERROR ApplicationMaster: User class threw exception: 
> org.apache.hadoop.fs.azure.AzureException: 
> org.apache.hadoop.fs.azure.KeyProviderException: ExitCodeException 
> exitCode=2: Error reading S/MIME message
> 140473279682200:error:0D07207B:asn1 encoding 
> routines:ASN1_get_object:header too long:asn1_lib.c:157:
> 140473279682200:error:0D0D106E:asn1 encoding 
> routines:B64_READ_ASN1:decode error:asn_mime.c:192:
> 140473279682200:error:0D0D40CB:asn1 encoding 
> routines:SMIME_read_ASN1:asn1 parse error:asn_mime.c:517:
> org.apache.hadoop.fs.azure.AzureException: 
> org.apache.hadoop.fs.azure.KeyProviderException: ExitCodeException 
> exitCode=2: Error reading S/MIME message
> 140473279682200:error:0D07207B:asn1 encoding 
> routines:ASN1_get_object:header too long:asn1_lib.c:157:
> 140473279682200:error:0D0D106E:asn1 encoding 
> routines:B64_READ_ASN1:decode error:asn_mime.c:192:
> 140473279682200:error:0D0D40CB:asn1 encoding 
> routines:SMIME_read_ASN1:asn1 parse error:asn_mime.c:517:
>   at 
> org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:953)
>   at 
> org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:450)
>   at 
> org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1209)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2761)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>   at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2795)
>   at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2777)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
>   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:366)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.immutable.List.flatMap(List.scala:344)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
>   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:294)
>   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:249)
>   at 
> taka.pipelines.AnomalyTrainingPipeline.main(AnomalyTrainingPipeline.java:35)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

[jira] [Commented] (SPARK-18952) regex strings not properly escaped in codegen for aggregations

2017-01-09 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812702#comment-15812702
 ] 

Apache Spark commented on SPARK-18952:
--

User 'brkyvz' has created a pull request for this issue:
https://github.com/apache/spark/pull/16518

> regex strings not properly escaped in codegen for aggregations
> --
>
> Key: SPARK-18952
> URL: https://issues.apache.org/jira/browse/SPARK-18952
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Burak Yavuz
>
> If I use the function regexp_extract, and then in my regex string, use `\`, 
> i.e. escape character, this fails codegen, because the `\` character is not 
> properly escaped when codegen'd.
> Example stack trace:
> {code}
> /* 059 */ private int maxSteps = 2;
> /* 060 */ private int numRows = 0;
> /* 061 */ private org.apache.spark.sql.types.StructType keySchema = new 
> org.apache.spark.sql.types.StructType().add("date_format(window#325.start, 
> -MM-dd HH:mm)", org.apache.spark.sql.types.DataTypes.StringType)
> /* 062 */ .add("regexp_extract(source#310.description, ([a-zA-Z]+)\[.*, 
> 1)", org.apache.spark.sql.types.DataTypes.StringType);
> /* 063 */ private org.apache.spark.sql.types.StructType valueSchema = new 
> org.apache.spark.sql.types.StructType().add("sum", 
> org.apache.spark.sql.types.DataTypes.LongType);
> /* 064 */ private Object emptyVBase;
> ...
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
> 62, Column 58: Invalid escape sequence
>   at org.codehaus.janino.Scanner.scanLiteralCharacter(Scanner.java:918)
>   at org.codehaus.janino.Scanner.produce(Scanner.java:604)
>   at org.codehaus.janino.Parser.peekRead(Parser.java:3239)
>   at org.codehaus.janino.Parser.parseArguments(Parser.java:3055)
>   at org.codehaus.janino.Parser.parseSelector(Parser.java:2914)
>   at org.codehaus.janino.Parser.parseUnaryExpression(Parser.java:2617)
>   at 
> org.codehaus.janino.Parser.parseMultiplicativeExpression(Parser.java:2573)
>   at org.codehaus.janino.Parser.parseAdditiveExpression(Parser.java:2552)
> {code}
> In the codegend expression, the literal should use `\\` instead of `\`



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19125) Streaming Duration by Count

2017-01-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812690#comment-15812690
 ] 

Sean Owen commented on SPARK-19125:
---

Yes, I don't think a distributed system, even, is a great candidate for 
reproducible results. There are several stochastic elements.

Still, what about queueStream()? You can create a fixed sequence of RDDs to 
pass to streaming for test-like situations like this.

> Streaming Duration by Count
> ---
>
> Key: SPARK-19125
> URL: https://issues.apache.org/jira/browse/SPARK-19125
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
> Environment: Java
>Reporter: Paulo Cândido
>
> I use the Spark Streaming in scientific way. In this cases, we have to run 
> the same experiment many times using the same seed to obtain the same result. 
> All randomic components have the seed as input, so I can controll it. 
> However, there is a unique component that doesn't depend of seeds and we 
> can't controll, it's the bach size. Regardless of the input way of stream, 
> the metric to break the microbaches is wall time. It's a problem in 
> scientific environment because if we run the same experiments with same param 
> many times, each time we can get a diferent result, depending the quantity of 
> elements read in each bach. The same stream source may generate diferent bach 
> sizes on multiple executions because of wall time.
> My sugestion is provide a new Duration metric: Count of Elements.
> Regardless of time spent to fill a microbatch, they will be always the same 
> size, and when the source has a seed to generate de same values, independent 
> of throughput, we will can replicate the experiments with same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19020) Cardinality estimation of aggregate operator

2017-01-09 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin resolved SPARK-19020.
-
   Resolution: Fixed
 Assignee: Zhenhua Wang
Fix Version/s: 2.2.0

> Cardinality estimation of aggregate operator
> 
>
> Key: SPARK-19020
> URL: https://issues.apache.org/jira/browse/SPARK-19020
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Zhenhua Wang
>Assignee: Zhenhua Wang
> Fix For: 2.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18917) Dataframe - Time Out Issues / Taking long time in append mode on object stores

2017-01-09 Thread Anbu Cheeralan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812608#comment-15812608
 ] 

Anbu Cheeralan commented on SPARK-18917:


I agree the Hadoop fix will reduce the recursive calls. 
But the solution addresses only S3. This not solved by Google Storage or Azure.

Further, Optionally disabling the code, removes one entire stage in execution. 
I would still like to have his as a feature.

> Dataframe - Time Out Issues / Taking long time in append mode on object stores
> --
>
> Key: SPARK-18917
> URL: https://issues.apache.org/jira/browse/SPARK-18917
> Project: Spark
>  Issue Type: Improvement
>  Components: EC2, SQL, YARN
>Affects Versions: 2.0.2
>Reporter: Anbu Cheeralan
>Priority: Minor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> When using Dataframe write in append mode on object stores (S3 / Google 
> Storage), the writes are taking long time to write/ getting read time out. 
> This is because dataframe.write lists all leaf folders in the target 
> directory. If there are lot of subfolders due to partitions, this is taking 
> for ever.
> The code is In org.apache.spark.sql.execution.datasources.DataSource.write() 
> following code causes huge number of RPC calls when the file system is an 
> Object Store (S3, GS).
> if (mode == SaveMode.Append) {
> val existingPartitionColumns = Try {
> resolveRelation()
> .asInstanceOf[HadoopFsRelation]
> .location
> .partitionSpec()
> .partitionColumns
> .fieldNames
> .toSeq
> }.getOrElse(Seq.empty[String])
> There should be a flag to skip Partition Match Check in append mode. I can 
> work on the patch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18665) Spark ThriftServer jobs where are canceled are still “STARTED”

2017-01-09 Thread cen yuhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

cen yuhai updated SPARK-18665:
--
Affects Version/s: 2.1.0

> Spark ThriftServer jobs where are canceled are still “STARTED”
> --
>
> Key: SPARK-18665
> URL: https://issues.apache.org/jira/browse/SPARK-18665
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.0
>Reporter: cen yuhai
> Attachments: 1179ACF7-3E62-44C5-B01D-CA71C876ECCE.png, 
> 83C5E8AD-59DE-4A85-A483-2BE3FB83F378.png
>
>
> I find that, some jobs are canceled, but the state are still "STARTED", I 
> think this bug are imported by SPARK-6964
> I find some logs:
> {code}
> 16/12/01 11:43:34 ERROR SparkExecuteStatementOperation: Error running hive 
> query: 
> org.apache.hive.service.cli.HiveSQLException: Illegal Operation state 
> transition from CLOSED to ERROR
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:91)
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:97)
>   at 
> org.apache.hive.service.cli.operation.Operation.setState(Operation.java:126)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:259)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:166)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:163)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1708)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> org.apache.hive.service.cli.HiveSQLException: Illegal Operation state 
> transition from CANCELED to ERROR
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:91)
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:97)
>   at 
> org.apache.hive.service.cli.operation.Operation.setState(Operation.java:126)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:259)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:166)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:163)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1708)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18665) Spark ThriftServer jobs where are canceled are still “STARTED”

2017-01-09 Thread cen yuhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

cen yuhai updated SPARK-18665:
--
Affects Version/s: 2.0.2

> Spark ThriftServer jobs where are canceled are still “STARTED”
> --
>
> Key: SPARK-18665
> URL: https://issues.apache.org/jira/browse/SPARK-18665
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.3, 2.0.2, 2.1.0
>Reporter: cen yuhai
> Attachments: 1179ACF7-3E62-44C5-B01D-CA71C876ECCE.png, 
> 83C5E8AD-59DE-4A85-A483-2BE3FB83F378.png
>
>
> I find that, some jobs are canceled, but the state are still "STARTED", I 
> think this bug are imported by SPARK-6964
> I find some logs:
> {code}
> 16/12/01 11:43:34 ERROR SparkExecuteStatementOperation: Error running hive 
> query: 
> org.apache.hive.service.cli.HiveSQLException: Illegal Operation state 
> transition from CLOSED to ERROR
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:91)
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:97)
>   at 
> org.apache.hive.service.cli.operation.Operation.setState(Operation.java:126)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:259)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:166)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:163)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1708)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> org.apache.hive.service.cli.HiveSQLException: Illegal Operation state 
> transition from CANCELED to ERROR
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:91)
>   at 
> org.apache.hive.service.cli.OperationState.validateTransition(OperationState.java:97)
>   at 
> org.apache.hive.service.cli.operation.Operation.setState(Operation.java:126)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:259)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:166)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:163)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1708)
>   at 
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13450) SortMergeJoin will OOM when join rows have lot of same keys

2017-01-09 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812550#comment-15812550
 ] 

Zhan Zhang commented on SPARK-13450:


ExternalAppendOnlyMap estimate the size of the data saved. In SortMergeJoin, I 
think we can leverage UnsafeExternalSorter to get more accurate and 
controllable behavior.

> SortMergeJoin will OOM when join rows have lot of same keys
> ---
>
> Key: SPARK-13450
> URL: https://issues.apache.org/jira/browse/SPARK-13450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 2.0.2, 2.1.0
>Reporter: Hong Shen
> Attachments: heap-dump-analysis.png
>
>
>   When I run a sql with join, task throw  java.lang.OutOfMemoryError and sql 
> failed. I have set spark.executor.memory  4096m.
>   SortMergeJoin use a ArrayBuffer[InternalRow] to store bufferedMatches, if 
> the join rows have a lot of same key, it will throw OutOfMemoryError.
> {code}
>   /** Buffered rows from the buffered side of the join. This is empty if 
> there are no matches. */
>   private[this] val bufferedMatches: ArrayBuffer[InternalRow] = new 
> ArrayBuffer[InternalRow]
> {code}
>   Here is the stackTrace:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy(Native Method)
> org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84)
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:190)
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:163)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> java.io.DataInputStream.readLong(DataInputStream.java:416)
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:71)
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:79)
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoin.scala:300)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoin.scala:329)
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoin.scala:229)
> org.apache.spark.sql.execution.joins.SortMergeJoin$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoin.scala:105)
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741)
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:301)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:301)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> org.apache.spark.scheduler.Task.run(Task.scala:89)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:215)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-13450) SortMergeJoin will OOM when join rows have lot of same keys

2017-01-09 Thread Tejas Patil (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812508#comment-15812508
 ] 

Tejas Patil edited comment on SPARK-13450 at 1/9/17 6:48 PM:
-

I have seen this problem a couple times in prod while trying out jobs over 
Spark. There have been some discussions in the jira and here are my comments on 
those:

- How to reproduce ? As [~shenhong] said, even in my case there were keys in 
the joined relation which were skewed. I was able to grab a heap dump which 
shows the array buffer grown more than a GB : 
https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png
- [~hvanhovell] had some suggestions about using Cartesian Join OR co-group. In 
my case, our users are running Hive SQL queries as-is over Spark. If there are 
one-off such cases, we could have done that but with automated migration of 
several jobs, we want the query to just work. OOMs lead to un-reliable behavior 
and affects users.
- I looked at `Window.scala` but it only works for unsafe rows. In sort merge 
join, we may or may not have unsafe rows.
- There was a PR associated with this jira 
(https://github.com/apache/spark/pull/11386) but its inactive. I tried to pick 
it up but it does not apply. Its basically copying ExternalAppendOnlyMap code 
and introducing a `Buffer` version of it for lists. Instead of taking care of 
merge conflicts, I was able to implement buffer version by reusing 
`ExternalAppendOnlyMap` code. Will submit a fresh PR after testing it.

{code}
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:503)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:61)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoinExec.scala:756)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoinExec.scala:660)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoinExec.scala:137)
at 
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:186)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:355)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:103)
at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:94)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}



was (Author: tejasp):
I have seen this problem a couple times in prod while trying out jobs over 
Spark. There have been some discussions in the jira and here are my comments on 
those:

- How to reproduce ? As [~shenhong] said, even in my case there were keys in 
the joined relation which were skewed. I was able to grab a heap dump (see 
attached image) which shows the array buffer grown more than a GB.
- [~hvanhovell] had some suggestions about using Cartesian Join OR co-group. In 
my case, our users are running Hive SQL queries as-is over Spark. If there are 
one-off such cases, we could have done that but with automated migration of 
several jobs, we want the query to just work. OOMs lead to un-reliable behavior 
and affects users.
- I looked at `Window.scala` but it only works for unsafe rows. In sort merge 
join, we may or may not have unsafe rows.
- There was a PR associated with this jira 
(https://github.com/apache/spark/pull/11386) but its inactive. I tried to pick 
it up but it does not apply. Its basically copying ExternalAppendOnlyMap code 
and introducing a `Buffer` 

  1   2   >