Re: Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Mridul Muralidharan
I agree, we should not work around the testcase but rather understand
and fix the root cause.
Closure cleaner should have null'ed out the references and allowed it
to be serialized.

Regards,
Mridul

On Sun, Aug 5, 2018 at 8:38 PM Wenchen Fan  wrote:
>
> It seems to me that the closure cleaner fails to clean up something. The 
> failed test case defines a serializable class inside the test case, and the 
> class doesn't refer to anything in the outer class. Ideally it can be 
> serialized after cleaning up the closure.
>
> This is somehow a very weird way to define a class, so I'm not sure how 
> serious the problem is.
>
> On Mon, Aug 6, 2018 at 3:41 AM Stavros Kontopoulos 
>  wrote:
>>
>> Makes sense, not sure if closure cleaning is related to the last one for 
>> example or others. The last one is a bit weird, unless I am missing 
>> something about the LegacyAccumulatorWrapper logic.
>>
>> Stavros
>>
>> On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen  wrote:
>>>
>>> Yep that's what I did. There are more failures with different resolutions. 
>>> I'll open a JIRA and PR and ping you, to make sure that the changes are all 
>>> reasonable, and not an artifact of missing something about closure cleaning 
>>> in 2.12.
>>>
>>> In the meantime having a 2.12 build up and running for master will just 
>>> help catch these things.
>>>
>>> On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos 
>>>  wrote:

 Hi Sean,

 I run a quick build so the failing tests seem to be:

 - SPARK-17644: After one stage is aborted for too many failed attempts, 
 subsequent stagesstill behave correctly on fetch failures *** FAILED ***
   A job with one fetch failure should eventually succeed 
 (DAGSchedulerSuite.scala:2422)


 - LegacyAccumulatorWrapper with AccumulatorParam that has no 
 equals/hashCode *** FAILED ***
   java.io.NotSerializableException: 
 org.scalatest.Assertions$AssertionsHelper
 Serialization stack:
 - object not serializable (class: 
 org.scalatest.Assertions$AssertionsHelper, value: 
 org.scalatest.Assertions$AssertionsHelper@3bc5fc8f)


 The last one can be fixed easily if you set class `MyData(val i: Int) 
 extends Serializable `outside of the test suite. For some reason outers 
 (not removed) are capturing
 the Scalatest stuff in 2.12.

 Let me know if we see the same failures.

 Stavros

 On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen  wrote:
>
> Shane et al - could we get a test job in Jenkins to test the Scala 2.12 
> build? I don't think I have the access or expertise for it, though I 
> could probably copy and paste a job. I think we just need to clone the, 
> say, master Maven Hadoop 2.7 job, and add two steps: run 
> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to 
> the profiles that are enabled.
>
> I can already see two test failures for the 2.12 build right now and will 
> try to fix those, but this should help verify whether the failures are 
> 'real' and detect them going forward.
>
>

>>
>>
>>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-05 Thread Jungtaek Lim
Answering one of missed question:

>  I am not sure how were you planning to expose the state key groups at
api level and if it would be transparent.

I was thinking about introducing new configuration: it may look like adding
unnecessary configuration, but I thought it would help elasticity
("adaptive execution" might be relevant in spark world) eventually. Being
put as a configuration would be easier for Spark to modify dynamically: If
we let user to call method in DSL, Spark should treat it as intentional and
try best to respect the user input.

When I thought about above I missed the fact that parallelism of state
operator would be applied to the parallelism of data writers unless we
input another stage. So it may not be ideal to rearrange such thing in
runtime and better to be more concerned about parallelism of data writers,
but still worth as food for thought.

2018년 8월 5일 (일) 오후 7:28, Jungtaek Lim 님이 작성:

> "coalesce" looks like working: I misunderstood it as an efficient version
> of "repartition" which does shuffle, so expected it would trigger shuffle.
> My proposal would be covered as using "coalesce": thanks Joseph for
> correction. Let me abandon the proposal.
>
> We may still miss for now is documentation for the fact: the number of
> partitions for states cannot be changed, so Spark restricts to modify
> "spark.sql.shuffle.partitions" once the query is run (only applying to
> streaming query, right?). If end users want to have more or less number of
> state partitions, the value should be set before running the query at the
> first time. Would it be better to add this to "Structured Streaming" doc?
>
> I agree that decoupling state and partitions would not be simple. I'd try
> out (offline) repartition first if the number of partitions would be really
> matter for scalability / elasticity.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이
> 작성:
>
>> I'd agree it might make sense to bundle this into an API. We'd have to
>> think about whether it's a common enough use case to justify the API
>> complexity.
>>
>> It might be worth exploring decoupling state and partitions, but I
>> wouldn't want to start making decisions based on it without a clearer
>> design picture. I would expect the decoupling to make it very difficult to
>> ensure idempotency of state updates.
>>
>> Jose
>>
>> On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:
>>
>>> coalesce might work.
>>>
>>> Say "spark.sql.shuffle.partitions" = 200, and then "
>>> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
>>> create 200 instances for state but execute just 2 tasks.
>>>
>>> However I think further groupByKey operations downstream would need
>>> similar coalesce.
>>>
>>> And this is assuming the user sets the right shuffle partitions upfront.
>>>
>>> It maybe worth to bundle this pattern as some builtin api so that it can
>>> be transparent to the user. I am not sure how were you planning to expose
>>> the state key groups at api level and if it would be transparent.
>>>
>>> IMO, decoupling the state and partitions and making it key based would
>>> still be worth exploring to support dynamic state rebalancing. May be the
>>> default HDFS based implementation can maintain the state partition wise and
>>> not support it, but there could be implementations based on distributed k-v
>>> store which supports this.
>>>
>>> Thanks,
>>> Arun
>>>
>>>
>>> On 3 August 2018 at 08:21, Joseph Torres 
>>> wrote:
>>>
 A coalesced RDD will definitely maintain any within-partition
 invariants that the original RDD maintained. It pretty much just runs its
 input partitions sequentially.

 There'd still be some Dataframe API work needed to get the coalesce
 operation where you want it to be, but this is much simpler than
 introducing a new concept of state key groups. As far as I can tell,
 state key groups are just the same thing that we currently call partitions
 of the aggregate RDD.

 On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:

> I’m afraid I don’t know about the details on coalesce(), but some
> finding resource for coalesce, it looks like helping reducing actual
> partitions.
>
> For streaming aggregation, state for all partitions (by default, 200)
> must be initialized and committed even it is being unchanged. Otherwise
> error occurred when reading a partition which is excluded in query
> previously. Moreover, it can’t find existing row from state or store row 
> in
> wrong partition if partition id doesn’t match the expected id via hashing
> function.
>
> Could you verify coalesce() meets such requirements?
>
> On Fri, 3 Aug 2018 at 22:23 Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> Scheduling multiple partitions in the same task is basically what
>> coalesce() does. Is there a reason that doesn't work here?
>>
>> On Fri, 

Re: [DISCUSS][SQL] Control the number of output files

2018-08-05 Thread John Zhuge
Great help from the community!

On Sun, Aug 5, 2018 at 6:17 PM Xiao Li  wrote:

> FYI, the new hints have been merged. They will be available in the
> upcoming release (Spark 2.4).
>
> *John Zhuge*, thanks for your work! Really appreciate it! Please submit
> more PRs and help the community improve Spark. : )
>
> Xiao
>
> 2018-08-05 21:06 GMT-04:00 Koert Kuipers :
>
>> lukas,
>> what is the jira ticket for this? i would like to follow it's activity.
>> thanks!
>> koert
>>
>> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec 
>> wrote:
>>
>>> Hi,
>>> Yes, This feature is planned - Spark should be soon able to repartition
>>> output by size.
>>> Lukas
>>>
>>>
>>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
>>> napsal:
>>>
 Has there been any discussion to simply support Hive's merge small
 files configuration? It simply adds one additional stage to inspect size of
 each output file, recompute the desired parallelism to reach a target size,
 and runs a map-only coalesce before committing the final files. Since AFAIK
 SparkSQL already stages the final output commit, it seems feasible to
 respect this Hive config.


 https://community.hortonworks.com/questions/106987/hive-multiple-small-files.html


 On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
 wrote:

> See some of the related discussion under
> https://github.com/apache/spark/pull/21589
>
> If feels to me like we need some kind of user code mechanism to signal
> policy preferences to Spark. This could also include ways to signal
> scheduling policy, which could include things like scheduling pool and/or
> barrier scheduling. Some of those scheduling policies operate at 
> inherently
> different levels currently -- e.g. scheduling pools at the Job level
> (really, the thread local level in the current implementation) and barrier
> scheduling at the Stage level -- so it is not completely obvious how to
> unify all of these policy options/preferences/mechanism, or whether it is
> possible, but I think it is worth considering such things at a fairly high
> level of abstraction and try to unify and simplify before making things
> more complex with multiple policy mechanisms.
>
> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin 
> wrote:
>
>> Seems like a good idea in general. Do other systems have similar
>> concepts? In general it'd be easier if we can follow existing convention 
>> if
>> there is any.
>>
>>
>> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge 
>> wrote:
>>
>>> Hi all,
>>>
>>> Many Spark users in my company are asking for a way to control the
>>> number of output files in Spark SQL. There are use cases to either 
>>> reduce
>>> or increase the number. The users prefer not to use function
>>> *repartition*(n) or *coalesce*(n, shuffle) that require them to
>>> write and deploy Scala/Java/Python code.
>>>
>>> Could we introduce a query hint for this purpose (similar to
>>> Broadcast Join Hints)?
>>>
>>> /*+ *COALESCE*(n, shuffle) */
>>>
>>> In general, is query hint is the best way to bring DF functionality
>>> to SQL without extending SQL syntax? Any suggestion is highly 
>>> appreciated.
>>>
>>> This requirement is not the same as SPARK-6221 that asked for
>>> auto-merging output files.
>>>
>>> Thanks,
>>> John Zhuge
>>>
>>
>>
>

-- 
John Zhuge


Re: [DISCUSS][SQL] Control the number of output files

2018-08-05 Thread John Zhuge
https://issues.apache.org/jira/browse/SPARK-24940

The PR has been merged to 2.4.0.

On Sun, Aug 5, 2018 at 6:06 PM Koert Kuipers  wrote:

> lukas,
> what is the jira ticket for this? i would like to follow it's activity.
> thanks!
> koert
>
> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec  wrote:
>
>> Hi,
>> Yes, This feature is planned - Spark should be soon able to repartition
>> output by size.
>> Lukas
>>
>>
>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
>> napsal:
>>
>>> Has there been any discussion to simply support Hive's merge small files
>>> configuration? It simply adds one additional stage to inspect size of each
>>> output file, recompute the desired parallelism to reach a target size, and
>>> runs a map-only coalesce before committing the final files. Since AFAIK
>>> SparkSQL already stages the final output commit, it seems feasible to
>>> respect this Hive config.
>>>
>>>
>>> https://community.hortonworks.com/questions/106987/hive-multiple-small-files.html
>>>
>>>
>>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>>> wrote:
>>>
 See some of the related discussion under
 https://github.com/apache/spark/pull/21589

 If feels to me like we need some kind of user code mechanism to signal
 policy preferences to Spark. This could also include ways to signal
 scheduling policy, which could include things like scheduling pool and/or
 barrier scheduling. Some of those scheduling policies operate at inherently
 different levels currently -- e.g. scheduling pools at the Job level
 (really, the thread local level in the current implementation) and barrier
 scheduling at the Stage level -- so it is not completely obvious how to
 unify all of these policy options/preferences/mechanism, or whether it is
 possible, but I think it is worth considering such things at a fairly high
 level of abstraction and try to unify and simplify before making things
 more complex with multiple policy mechanisms.

 On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin 
 wrote:

> Seems like a good idea in general. Do other systems have similar
> concepts? In general it'd be easier if we can follow existing convention 
> if
> there is any.
>
>
> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge  wrote:
>
>> Hi all,
>>
>> Many Spark users in my company are asking for a way to control the
>> number of output files in Spark SQL. There are use cases to either reduce
>> or increase the number. The users prefer not to use function
>> *repartition*(n) or *coalesce*(n, shuffle) that require them to
>> write and deploy Scala/Java/Python code.
>>
>> Could we introduce a query hint for this purpose (similar to
>> Broadcast Join Hints)?
>>
>> /*+ *COALESCE*(n, shuffle) */
>>
>> In general, is query hint is the best way to bring DF functionality
>> to SQL without extending SQL syntax? Any suggestion is highly 
>> appreciated.
>>
>> This requirement is not the same as SPARK-6221 that asked for
>> auto-merging output files.
>>
>> Thanks,
>> John Zhuge
>>
>
>

-- 
John Zhuge


Re: Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Wenchen Fan
It seems to me that the closure cleaner fails to clean up something. The
failed test case defines a serializable class inside the test case, and the
class doesn't refer to anything in the outer class. Ideally it can be
serialized after cleaning up the closure.

This is somehow a very weird way to define a class, so I'm not sure how
serious the problem is.

On Mon, Aug 6, 2018 at 3:41 AM Stavros Kontopoulos <
stavros.kontopou...@lightbend.com> wrote:

> Makes sense, not sure if closure cleaning is related to the last one for
> example or others. The last one is a bit weird, unless I am missing
> something about the LegacyAccumulatorWrapper logic.
>
> Stavros
>
> On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen  wrote:
>
>> Yep that's what I did. There are more failures with different
>> resolutions. I'll open a JIRA and PR and ping you, to make sure that the
>> changes are all reasonable, and not an artifact of missing something about
>> closure cleaning in 2.12.
>>
>> In the meantime having a 2.12 build up and running for master will just
>> help catch these things.
>>
>> On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos <
>> stavros.kontopou...@lightbend.com> wrote:
>>
>>> Hi Sean,
>>>
>>> I run a quick build so the failing tests seem to be:
>>>
>>> - SPARK-17644: After one stage is aborted for too many failed attempts, 
>>> subsequent stagesstill behave correctly on fetch failures *** FAILED ***
>>>   A job with one fetch failure should eventually succeed 
>>> (DAGSchedulerSuite.scala:2422)
>>>
>>>
>>> - LegacyAccumulatorWrapper with AccumulatorParam that has no 
>>> equals/hashCode *** FAILED ***
>>>   java.io.NotSerializableException: 
>>> org.scalatest.Assertions$AssertionsHelper
>>> Serialization stack:
>>> - object not serializable (class: 
>>> org.scalatest.Assertions$AssertionsHelper, value: 
>>> org.scalatest.Assertions$AssertionsHelper@3bc5fc8f)
>>>
>>>
>>> The last one can be fixed easily if you set class `MyData(val i: Int)
>>> extends Serializable `outside of the test suite. For some reason outers
>>> (not removed) are capturing
>>> the Scalatest stuff in 2.12.
>>>
>>> Let me know if we see the same failures.
>>>
>>> Stavros
>>>
>>> On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen  wrote:
>>>
 Shane et al - could we get a test job in Jenkins to test the Scala 2.12
 build? I don't think I have the access or expertise for it, though I could
 probably copy and paste a job. I think we just need to clone the, say,
 master Maven Hadoop 2.7 job, and add two steps: run
 "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the
 profiles that are enabled.

 I can already see two test failures for the 2.12 build right now and
 will try to fix those, but this should help verify whether the failures are
 'real' and detect them going forward.



>>>
>
>
>


Re: [DISCUSS][SQL] Control the number of output files

2018-08-05 Thread Xiao Li
FYI, the new hints have been merged. They will be available in the upcoming
release (Spark 2.4).

*John Zhuge*, thanks for your work! Really appreciate it! Please submit
more PRs and help the community improve Spark. : )

Xiao

2018-08-05 21:06 GMT-04:00 Koert Kuipers :

> lukas,
> what is the jira ticket for this? i would like to follow it's activity.
> thanks!
> koert
>
> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec  wrote:
>
>> Hi,
>> Yes, This feature is planned - Spark should be soon able to repartition
>> output by size.
>> Lukas
>>
>>
>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
>> napsal:
>>
>>> Has there been any discussion to simply support Hive's merge small files
>>> configuration? It simply adds one additional stage to inspect size of each
>>> output file, recompute the desired parallelism to reach a target size, and
>>> runs a map-only coalesce before committing the final files. Since AFAIK
>>> SparkSQL already stages the final output commit, it seems feasible to
>>> respect this Hive config.
>>>
>>> https://community.hortonworks.com/questions/106987/hive-mult
>>> iple-small-files.html
>>>
>>>
>>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>>> wrote:
>>>
 See some of the related discussion under https://github.com/apach
 e/spark/pull/21589

 If feels to me like we need some kind of user code mechanism to signal
 policy preferences to Spark. This could also include ways to signal
 scheduling policy, which could include things like scheduling pool and/or
 barrier scheduling. Some of those scheduling policies operate at inherently
 different levels currently -- e.g. scheduling pools at the Job level
 (really, the thread local level in the current implementation) and barrier
 scheduling at the Stage level -- so it is not completely obvious how to
 unify all of these policy options/preferences/mechanism, or whether it is
 possible, but I think it is worth considering such things at a fairly high
 level of abstraction and try to unify and simplify before making things
 more complex with multiple policy mechanisms.

 On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin 
 wrote:

> Seems like a good idea in general. Do other systems have similar
> concepts? In general it'd be easier if we can follow existing convention 
> if
> there is any.
>
>
> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge  wrote:
>
>> Hi all,
>>
>> Many Spark users in my company are asking for a way to control the
>> number of output files in Spark SQL. There are use cases to either reduce
>> or increase the number. The users prefer not to use function
>> *repartition*(n) or *coalesce*(n, shuffle) that require them to
>> write and deploy Scala/Java/Python code.
>>
>> Could we introduce a query hint for this purpose (similar to
>> Broadcast Join Hints)?
>>
>> /*+ *COALESCE*(n, shuffle) */
>>
>> In general, is query hint is the best way to bring DF functionality
>> to SQL without extending SQL syntax? Any suggestion is highly 
>> appreciated.
>>
>> This requirement is not the same as SPARK-6221 that asked for
>> auto-merging output files.
>>
>> Thanks,
>> John Zhuge
>>
>
>


Re: [DISCUSS][SQL] Control the number of output files

2018-08-05 Thread Koert Kuipers
lukas,
what is the jira ticket for this? i would like to follow it's activity.
thanks!
koert

On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec  wrote:

> Hi,
> Yes, This feature is planned - Spark should be soon able to repartition
> output by size.
> Lukas
>
>
> Dne st 25. 7. 2018 23:26 uživatel Forest Fang 
> napsal:
>
>> Has there been any discussion to simply support Hive's merge small files
>> configuration? It simply adds one additional stage to inspect size of each
>> output file, recompute the desired parallelism to reach a target size, and
>> runs a map-only coalesce before committing the final files. Since AFAIK
>> SparkSQL already stages the final output commit, it seems feasible to
>> respect this Hive config.
>>
>> https://community.hortonworks.com/questions/106987/hive-
>> multiple-small-files.html
>>
>>
>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra 
>> wrote:
>>
>>> See some of the related discussion under https://github.com/
>>> apache/spark/pull/21589
>>>
>>> If feels to me like we need some kind of user code mechanism to signal
>>> policy preferences to Spark. This could also include ways to signal
>>> scheduling policy, which could include things like scheduling pool and/or
>>> barrier scheduling. Some of those scheduling policies operate at inherently
>>> different levels currently -- e.g. scheduling pools at the Job level
>>> (really, the thread local level in the current implementation) and barrier
>>> scheduling at the Stage level -- so it is not completely obvious how to
>>> unify all of these policy options/preferences/mechanism, or whether it is
>>> possible, but I think it is worth considering such things at a fairly high
>>> level of abstraction and try to unify and simplify before making things
>>> more complex with multiple policy mechanisms.
>>>
>>> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin  wrote:
>>>
 Seems like a good idea in general. Do other systems have similar
 concepts? In general it'd be easier if we can follow existing convention if
 there is any.


 On Wed, Jul 25, 2018 at 11:50 AM John Zhuge  wrote:

> Hi all,
>
> Many Spark users in my company are asking for a way to control the
> number of output files in Spark SQL. There are use cases to either reduce
> or increase the number. The users prefer not to use function
> *repartition*(n) or *coalesce*(n, shuffle) that require them to write
> and deploy Scala/Java/Python code.
>
> Could we introduce a query hint for this purpose (similar to Broadcast
> Join Hints)?
>
> /*+ *COALESCE*(n, shuffle) */
>
> In general, is query hint is the best way to bring DF functionality to
> SQL without extending SQL syntax? Any suggestion is highly appreciated.
>
> This requirement is not the same as SPARK-6221 that asked for
> auto-merging output files.
>
> Thanks,
> John Zhuge
>



Re: Why is SQLImplicits an abstract class rather than a trait?

2018-08-05 Thread Jacek Laskowski
Hi Assaf,

No idea (and don't remember I've ever wondered about it before), but why
not doing this (untested):

trait MySparkTestTrait {
  lazy val spark: SparkSession = SparkSession.builder().getOrCreate() //
<-- you sure you don't need master?
  import spark.implicits._
}

Wouldn't that import work?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Sun, Aug 5, 2018 at 5:34 PM, assaf.mendelson 
wrote:

> Hi all,
>
> I have been playing a bit with SQLImplicits and noticed that it is an
> abstract class. I was wondering why is that? It has no constructor.
>
> Because of it being an abstract class it means that adding a test trait
> cannot extend it and still be a trait.
>
> Consider the following:
>
> trait MySparkTestTrait extends SQLImplicits {
>   lazy val spark: SparkSession = SparkSession.builder().getOrCreate()
>   protected override def _sqlContext: SQLContext = spark.sqlContext
> }
>
>
> This would mean that if I can do something like this:
>
>
> class MyTestClass extends FunSuite with MySparkTestTrait {
> test("SomeTest") {
> // use spark implicits without needing to do import
> spark.implicits._
> }
> }
>
> Is there a reason for this being an abstract class?
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Stavros Kontopoulos
Makes sense, not sure if closure cleaning is related to the last one for
example or others. The last one is a bit weird, unless I am missing
something about the LegacyAccumulatorWrapper logic.

Stavros

On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen  wrote:

> Yep that's what I did. There are more failures with different resolutions.
> I'll open a JIRA and PR and ping you, to make sure that the changes are all
> reasonable, and not an artifact of missing something about closure cleaning
> in 2.12.
>
> In the meantime having a 2.12 build up and running for master will just
> help catch these things.
>
> On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos  lightbend.com> wrote:
>
>> Hi Sean,
>>
>> I run a quick build so the failing tests seem to be:
>>
>> - SPARK-17644: After one stage is aborted for too many failed attempts, 
>> subsequent stagesstill behave correctly on fetch failures *** FAILED ***
>>   A job with one fetch failure should eventually succeed 
>> (DAGSchedulerSuite.scala:2422)
>>
>>
>> - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode 
>> *** FAILED ***
>>   java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
>> Serialization stack:
>>  - object not serializable (class: 
>> org.scalatest.Assertions$AssertionsHelper, value: 
>> org.scalatest.Assertions$AssertionsHelper@3bc5fc8f)
>>
>>
>> The last one can be fixed easily if you set class `MyData(val i: Int)
>> extends Serializable `outside of the test suite. For some reason outers
>> (not removed) are capturing
>> the Scalatest stuff in 2.12.
>>
>> Let me know if we see the same failures.
>>
>> Stavros
>>
>> On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen  wrote:
>>
>>> Shane et al - could we get a test job in Jenkins to test the Scala 2.12
>>> build? I don't think I have the access or expertise for it, though I could
>>> probably copy and paste a job. I think we just need to clone the, say,
>>> master Maven Hadoop 2.7 job, and add two steps: run
>>> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the
>>> profiles that are enabled.
>>>
>>> I can already see two test failures for the 2.12 build right now and
>>> will try to fix those, but this should help verify whether the failures are
>>> 'real' and detect them going forward.
>>>
>>>
>>>
>>


Re: Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Sean Owen
Yep that's what I did. There are more failures with different resolutions.
I'll open a JIRA and PR and ping you, to make sure that the changes are all
reasonable, and not an artifact of missing something about closure cleaning
in 2.12.

In the meantime having a 2.12 build up and running for master will just
help catch these things.

On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos <
stavros.kontopou...@lightbend.com> wrote:

> Hi Sean,
>
> I run a quick build so the failing tests seem to be:
>
> - SPARK-17644: After one stage is aborted for too many failed attempts, 
> subsequent stagesstill behave correctly on fetch failures *** FAILED ***
>   A job with one fetch failure should eventually succeed 
> (DAGSchedulerSuite.scala:2422)
>
>
> - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode 
> *** FAILED ***
>   java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
> Serialization stack:
>   - object not serializable (class: 
> org.scalatest.Assertions$AssertionsHelper, value: 
> org.scalatest.Assertions$AssertionsHelper@3bc5fc8f)
>
>
> The last one can be fixed easily if you set class `MyData(val i: Int)
> extends Serializable `outside of the test suite. For some reason outers
> (not removed) are capturing
> the Scalatest stuff in 2.12.
>
> Let me know if we see the same failures.
>
> Stavros
>
> On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen  wrote:
>
>> Shane et al - could we get a test job in Jenkins to test the Scala 2.12
>> build? I don't think I have the access or expertise for it, though I could
>> probably copy and paste a job. I think we just need to clone the, say,
>> master Maven Hadoop 2.7 job, and add two steps: run
>> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the
>> profiles that are enabled.
>>
>> I can already see two test failures for the 2.12 build right now and will
>> try to fix those, but this should help verify whether the failures are
>> 'real' and detect them going forward.
>>
>>
>>
>


Re: Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Stavros Kontopoulos
Hi Sean,

I run a quick build so the failing tests seem to be:

- SPARK-17644: After one stage is aborted for too many failed
attempts, subsequent stagesstill behave correctly on fetch failures
*** FAILED ***
  A job with one fetch failure should eventually succeed
(DAGSchedulerSuite.scala:2422)


- LegacyAccumulatorWrapper with AccumulatorParam that has no
equals/hashCode *** FAILED ***
  java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
Serialization stack:
- object not serializable (class:
org.scalatest.Assertions$AssertionsHelper, value:
org.scalatest.Assertions$AssertionsHelper@3bc5fc8f)


The last one can be fixed easily if you set class `MyData(val i: Int)
extends Serializable `outside of the test suite. For some reason outers
(not removed) are capturing
the Scalatest stuff in 2.12.

Let me know if we see the same failures.

Stavros

On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen  wrote:

> Shane et al - could we get a test job in Jenkins to test the Scala 2.12
> build? I don't think I have the access or expertise for it, though I could
> probably copy and paste a job. I think we just need to clone the, say,
> master Maven Hadoop 2.7 job, and add two steps: run
> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the
> profiles that are enabled.
>
> I can already see two test failures for the 2.12 build right now and will
> try to fix those, but this should help verify whether the failures are
> 'real' and detect them going forward.
>
>
>


Why is SQLImplicits an abstract class rather than a trait?

2018-08-05 Thread assaf.mendelson
Hi all,

I have been playing a bit with SQLImplicits and noticed that it is an
abstract class. I was wondering why is that? It has no constructor.

Because of it being an abstract class it means that adding a test trait
cannot extend it and still be a trait.

Consider the following:

trait MySparkTestTrait extends SQLImplicits {
  lazy val spark: SparkSession = SparkSession.builder().getOrCreate()
  protected override def _sqlContext: SQLContext = spark.sqlContext
}


This would mean that if I can do something like this:


class MyTestClass extends FunSuite with MySparkTestTrait {
test("SomeTest") {
// use spark implicits without needing to do import
spark.implicits._
}
}

Is there a reason for this being an abstract class?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Set up Scala 2.12 test build in Jenkins

2018-08-05 Thread Sean Owen
Shane et al - could we get a test job in Jenkins to test the Scala 2.12
build? I don't think I have the access or expertise for it, though I could
probably copy and paste a job. I think we just need to clone the, say,
master Maven Hadoop 2.7 job, and add two steps: run
"./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the
profiles that are enabled.

I can already see two test failures for the 2.12 build right now and will
try to fix those, but this should help verify whether the failures are
'real' and detect them going forward.


Re: Am I crazy, or does the binary distro not have Kafka integration?

2018-08-05 Thread Sean Owen
Yes it's a resaonable argument, that putting N more external integration
modules on the default spark-submit classpath might bring in more
third-party dependencies that clash or something. I think the convenience
factor isn't a big deal; users can also just write a dependence on said
module in their own app, once. It does seem like we could at least *ship*
the binary bits in "external-jars/' or something; they're not even compiled
in the binary distro. And it also means users have to make sure the version
of spark-kafka they integrate works with their cluster, which means not
just making sure their app matches the user-facing API of spark-kafka, but
ensuring that the spark-kafka module's interface to spark works -- whatever
internal details there may be there.

On Sat, Aug 4, 2018 at 9:15 PM Matei Zaharia 
wrote:

> I think that traditionally, the reason *not* to include these has been if
> they brought additional dependencies that users don’t really need, but that
> might clash with what the users have in their own app. Maybe this used to
> be the case for Kafka. We could analyze it and include it by default, or
> perhaps make it easier to add it in spark-submit and spark-shell. I feel
> that in an IDE, it won’t be a huge problem because you just add it once,
> but it is annoying for spark-submit.
>
> Matei
>
> > On Aug 4, 2018, at 2:19 PM, Sean Owen  wrote:
> >
> > Hm OK I am crazy then. I think I never noticed it because I had always
> used a distro that did actually supply this on the classpath.
> > Well ... I think it would be reasonable to include these things (at
> least, Kafka integration) by default in the binary distro. I'll update the
> JIRA to reflect that this is at best a Wish.
> >
> > On Sat, Aug 4, 2018 at 4:17 PM Jacek Laskowski  wrote:
> > Hi Sean,
> >
> > It's been for years I'd say that you had to specify --packages to get
> the Kafka-related jars on the classpath. I simply got used to this
> annoyance (as did others). Could it be that it's an external package
> (although an integral part of Spark)?!
> >
> > I'm very glad you've brought it up since I think Kafka data source is so
> important that it should be included in spark-shell and spark-submit by
> default. THANKS!
> >
> > Pozdrawiam,
> > Jacek Laskowski
> > 
> > https://about.me/JacekLaskowski
> > Mastering Spark SQL https://bit.ly/mastering-spark-sql
> > Spark Structured Streaming https://bit.ly/spark-structured-streaming
> > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> > Follow me at https://twitter.com/jaceklaskowski
> >
> > On Sat, Aug 4, 2018 at 9:56 PM, Sean Owen  wrote:
> > Let's take this to https://issues.apache.org/jira/browse/SPARK-25026 --
> I provisionally marked this a Blocker, as if it's correct, then the release
> is missing an important piece and we'll want to remedy that ASAP. I still
> have this feeling I am missing something. The classes really aren't there
> in the release but ... *nobody* noticed all this time? I guess maybe
> Spark-Kafka users may be using a vendor distro that does package these bits.
> >
> >
> > On Sat, Aug 4, 2018 at 10:48 AM Sean Owen  wrote:
> > I was debugging why a Kafka-based streaming app doesn't seem to find
> Kafka-related integration classes when run standalone from our latest 2.3.1
> release, and noticed that there doesn't seem to be any Kafka-related jars
> from Spark in the distro. In jars/, I see:
> >
> > spark-catalyst_2.11-2.3.1.jar
> > spark-core_2.11-2.3.1.jar
> > spark-graphx_2.11-2.3.1.jar
> > spark-hive-thriftserver_2.11-2.3.1.jar
> > spark-hive_2.11-2.3.1.jar
> > spark-kubernetes_2.11-2.3.1.jar
> > spark-kvstore_2.11-2.3.1.jar
> > spark-launcher_2.11-2.3.1.jar
> > spark-mesos_2.11-2.3.1.jar
> > spark-mllib-local_2.11-2.3.1.jar
> > spark-mllib_2.11-2.3.1.jar
> > spark-network-common_2.11-2.3.1.jar
> > spark-network-shuffle_2.11-2.3.1.jar
> > spark-repl_2.11-2.3.1.jar
> > spark-sketch_2.11-2.3.1.jar
> > spark-sql_2.11-2.3.1.jar
> > spark-streaming_2.11-2.3.1.jar
> > spark-tags_2.11-2.3.1.jar
> > spark-unsafe_2.11-2.3.1.jar
> > spark-yarn_2.11-2.3.1.jar
> >
> > I checked make-distribution.sh, and it copies a bunch of JARs into the
> distro, but does not seem to touch the kafka modules.
> >
> > Am I crazy or missing something obvious -- those should be in the
> release, right?
> >
>
>


Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming

2018-08-05 Thread Jungtaek Lim
"coalesce" looks like working: I misunderstood it as an efficient version
of "repartition" which does shuffle, so expected it would trigger shuffle.
My proposal would be covered as using "coalesce": thanks Joseph for
correction. Let me abandon the proposal.

We may still miss for now is documentation for the fact: the number of
partitions for states cannot be changed, so Spark restricts to modify
"spark.sql.shuffle.partitions" once the query is run (only applying to
streaming query, right?). If end users want to have more or less number of
state partitions, the value should be set before running the query at the
first time. Would it be better to add this to "Structured Streaming" doc?

I agree that decoupling state and partitions would not be simple. I'd try
out (offline) repartition first if the number of partitions would be really
matter for scalability / elasticity.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이 작성:

> I'd agree it might make sense to bundle this into an API. We'd have to
> think about whether it's a common enough use case to justify the API
> complexity.
>
> It might be worth exploring decoupling state and partitions, but I
> wouldn't want to start making decisions based on it without a clearer
> design picture. I would expect the decoupling to make it very difficult to
> ensure idempotency of state updates.
>
> Jose
>
> On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan  wrote:
>
>> coalesce might work.
>>
>> Say "spark.sql.shuffle.partitions" = 200, and then "
>> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still
>> create 200 instances for state but execute just 2 tasks.
>>
>> However I think further groupByKey operations downstream would need
>> similar coalesce.
>>
>> And this is assuming the user sets the right shuffle partitions upfront.
>>
>> It maybe worth to bundle this pattern as some builtin api so that it can
>> be transparent to the user. I am not sure how were you planning to expose
>> the state key groups at api level and if it would be transparent.
>>
>> IMO, decoupling the state and partitions and making it key based would
>> still be worth exploring to support dynamic state rebalancing. May be the
>> default HDFS based implementation can maintain the state partition wise and
>> not support it, but there could be implementations based on distributed k-v
>> store which supports this.
>>
>> Thanks,
>> Arun
>>
>>
>> On 3 August 2018 at 08:21, Joseph Torres 
>> wrote:
>>
>>> A coalesced RDD will definitely maintain any within-partition invariants
>>> that the original RDD maintained. It pretty much just runs its input
>>> partitions sequentially.
>>>
>>> There'd still be some Dataframe API work needed to get the coalesce
>>> operation where you want it to be, but this is much simpler than
>>> introducing a new concept of state key groups. As far as I can tell,
>>> state key groups are just the same thing that we currently call partitions
>>> of the aggregate RDD.
>>>
>>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim  wrote:
>>>
 I’m afraid I don’t know about the details on coalesce(), but some
 finding resource for coalesce, it looks like helping reducing actual
 partitions.

 For streaming aggregation, state for all partitions (by default, 200)
 must be initialized and committed even it is being unchanged. Otherwise
 error occurred when reading a partition which is excluded in query
 previously. Moreover, it can’t find existing row from state or store row in
 wrong partition if partition id doesn’t match the expected id via hashing
 function.

 Could you verify coalesce() meets such requirements?

 On Fri, 3 Aug 2018 at 22:23 Joseph Torres 
 wrote:

> Scheduling multiple partitions in the same task is basically what
> coalesce() does. Is there a reason that doesn't work here?
>
> On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim 
> wrote:
>
>> Here's a link for Google docs (anyone can comment):
>>
>> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_3pXEsyq4LGpyNs/edit?usp=sharing
>>
>> Please note that I just copied the content to the google docs, so
>> someone could point out lack of details. I would like to start with
>> explanation of the concept, and once we are in agreement on going 
>> forward,
>> I could add more detail in doc, or even just start working and detail can
>> be shared with POC code or even WIP patch.
>>
>> Answer inlined for Arun's comments:
>>
>> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성:
>>
>>> Can you share this in a google doc to make the discussions easier.?
>>>
>>>
>>>
>>> Thanks for coming up with ideas to improve upon the current
>>> restrictions with the SS state store.
>>>
>>>
>>>
>>> If I understood correctly, the plan is to introduce a logical
>>> partitioning scheme for state storage