Re: Set up Scala 2.12 test build in Jenkins
I agree, we should not work around the testcase but rather understand and fix the root cause. Closure cleaner should have null'ed out the references and allowed it to be serialized. Regards, Mridul On Sun, Aug 5, 2018 at 8:38 PM Wenchen Fan wrote: > > It seems to me that the closure cleaner fails to clean up something. The > failed test case defines a serializable class inside the test case, and the > class doesn't refer to anything in the outer class. Ideally it can be > serialized after cleaning up the closure. > > This is somehow a very weird way to define a class, so I'm not sure how > serious the problem is. > > On Mon, Aug 6, 2018 at 3:41 AM Stavros Kontopoulos > wrote: >> >> Makes sense, not sure if closure cleaning is related to the last one for >> example or others. The last one is a bit weird, unless I am missing >> something about the LegacyAccumulatorWrapper logic. >> >> Stavros >> >> On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen wrote: >>> >>> Yep that's what I did. There are more failures with different resolutions. >>> I'll open a JIRA and PR and ping you, to make sure that the changes are all >>> reasonable, and not an artifact of missing something about closure cleaning >>> in 2.12. >>> >>> In the meantime having a 2.12 build up and running for master will just >>> help catch these things. >>> >>> On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos >>> wrote: Hi Sean, I run a quick build so the failing tests seem to be: - SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stagesstill behave correctly on fetch failures *** FAILED *** A job with one fetch failure should eventually succeed (DAGSchedulerSuite.scala:2422) - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode *** FAILED *** java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper Serialization stack: - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@3bc5fc8f) The last one can be fixed easily if you set class `MyData(val i: Int) extends Serializable `outside of the test suite. For some reason outers (not removed) are capturing the Scalatest stuff in 2.12. Let me know if we see the same failures. Stavros On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen wrote: > > Shane et al - could we get a test job in Jenkins to test the Scala 2.12 > build? I don't think I have the access or expertise for it, though I > could probably copy and paste a job. I think we just need to clone the, > say, master Maven Hadoop 2.7 job, and add two steps: run > "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to > the profiles that are enabled. > > I can already see two test failures for the 2.12 build right now and will > try to fix those, but this should help verify whether the failures are > 'real' and detect them going forward. > > >> >> >> - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming
Answering one of missed question: > I am not sure how were you planning to expose the state key groups at api level and if it would be transparent. I was thinking about introducing new configuration: it may look like adding unnecessary configuration, but I thought it would help elasticity ("adaptive execution" might be relevant in spark world) eventually. Being put as a configuration would be easier for Spark to modify dynamically: If we let user to call method in DSL, Spark should treat it as intentional and try best to respect the user input. When I thought about above I missed the fact that parallelism of state operator would be applied to the parallelism of data writers unless we input another stage. So it may not be ideal to rearrange such thing in runtime and better to be more concerned about parallelism of data writers, but still worth as food for thought. 2018년 8월 5일 (일) 오후 7:28, Jungtaek Lim 님이 작성: > "coalesce" looks like working: I misunderstood it as an efficient version > of "repartition" which does shuffle, so expected it would trigger shuffle. > My proposal would be covered as using "coalesce": thanks Joseph for > correction. Let me abandon the proposal. > > We may still miss for now is documentation for the fact: the number of > partitions for states cannot be changed, so Spark restricts to modify > "spark.sql.shuffle.partitions" once the query is run (only applying to > streaming query, right?). If end users want to have more or less number of > state partitions, the value should be set before running the query at the > first time. Would it be better to add this to "Structured Streaming" doc? > > I agree that decoupling state and partitions would not be simple. I'd try > out (offline) repartition first if the number of partitions would be really > matter for scalability / elasticity. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이 > 작성: > >> I'd agree it might make sense to bundle this into an API. We'd have to >> think about whether it's a common enough use case to justify the API >> complexity. >> >> It might be worth exploring decoupling state and partitions, but I >> wouldn't want to start making decisions based on it without a clearer >> design picture. I would expect the decoupling to make it very difficult to >> ensure idempotency of state updates. >> >> Jose >> >> On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan wrote: >> >>> coalesce might work. >>> >>> Say "spark.sql.shuffle.partitions" = 200, and then " >>> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still >>> create 200 instances for state but execute just 2 tasks. >>> >>> However I think further groupByKey operations downstream would need >>> similar coalesce. >>> >>> And this is assuming the user sets the right shuffle partitions upfront. >>> >>> It maybe worth to bundle this pattern as some builtin api so that it can >>> be transparent to the user. I am not sure how were you planning to expose >>> the state key groups at api level and if it would be transparent. >>> >>> IMO, decoupling the state and partitions and making it key based would >>> still be worth exploring to support dynamic state rebalancing. May be the >>> default HDFS based implementation can maintain the state partition wise and >>> not support it, but there could be implementations based on distributed k-v >>> store which supports this. >>> >>> Thanks, >>> Arun >>> >>> >>> On 3 August 2018 at 08:21, Joseph Torres >>> wrote: >>> A coalesced RDD will definitely maintain any within-partition invariants that the original RDD maintained. It pretty much just runs its input partitions sequentially. There'd still be some Dataframe API work needed to get the coalesce operation where you want it to be, but this is much simpler than introducing a new concept of state key groups. As far as I can tell, state key groups are just the same thing that we currently call partitions of the aggregate RDD. On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim wrote: > I’m afraid I don’t know about the details on coalesce(), but some > finding resource for coalesce, it looks like helping reducing actual > partitions. > > For streaming aggregation, state for all partitions (by default, 200) > must be initialized and committed even it is being unchanged. Otherwise > error occurred when reading a partition which is excluded in query > previously. Moreover, it can’t find existing row from state or store row > in > wrong partition if partition id doesn’t match the expected id via hashing > function. > > Could you verify coalesce() meets such requirements? > > On Fri, 3 Aug 2018 at 22:23 Joseph Torres < > joseph.tor...@databricks.com> wrote: > >> Scheduling multiple partitions in the same task is basically what >> coalesce() does. Is there a reason that doesn't work here? >> >> On Fri,
Re: [DISCUSS][SQL] Control the number of output files
Great help from the community! On Sun, Aug 5, 2018 at 6:17 PM Xiao Li wrote: > FYI, the new hints have been merged. They will be available in the > upcoming release (Spark 2.4). > > *John Zhuge*, thanks for your work! Really appreciate it! Please submit > more PRs and help the community improve Spark. : ) > > Xiao > > 2018-08-05 21:06 GMT-04:00 Koert Kuipers : > >> lukas, >> what is the jira ticket for this? i would like to follow it's activity. >> thanks! >> koert >> >> On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec >> wrote: >> >>> Hi, >>> Yes, This feature is planned - Spark should be soon able to repartition >>> output by size. >>> Lukas >>> >>> >>> Dne st 25. 7. 2018 23:26 uživatel Forest Fang >>> napsal: >>> Has there been any discussion to simply support Hive's merge small files configuration? It simply adds one additional stage to inspect size of each output file, recompute the desired parallelism to reach a target size, and runs a map-only coalesce before committing the final files. Since AFAIK SparkSQL already stages the final output commit, it seems feasible to respect this Hive config. https://community.hortonworks.com/questions/106987/hive-multiple-small-files.html On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra wrote: > See some of the related discussion under > https://github.com/apache/spark/pull/21589 > > If feels to me like we need some kind of user code mechanism to signal > policy preferences to Spark. This could also include ways to signal > scheduling policy, which could include things like scheduling pool and/or > barrier scheduling. Some of those scheduling policies operate at > inherently > different levels currently -- e.g. scheduling pools at the Job level > (really, the thread local level in the current implementation) and barrier > scheduling at the Stage level -- so it is not completely obvious how to > unify all of these policy options/preferences/mechanism, or whether it is > possible, but I think it is worth considering such things at a fairly high > level of abstraction and try to unify and simplify before making things > more complex with multiple policy mechanisms. > > On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin > wrote: > >> Seems like a good idea in general. Do other systems have similar >> concepts? In general it'd be easier if we can follow existing convention >> if >> there is any. >> >> >> On Wed, Jul 25, 2018 at 11:50 AM John Zhuge >> wrote: >> >>> Hi all, >>> >>> Many Spark users in my company are asking for a way to control the >>> number of output files in Spark SQL. There are use cases to either >>> reduce >>> or increase the number. The users prefer not to use function >>> *repartition*(n) or *coalesce*(n, shuffle) that require them to >>> write and deploy Scala/Java/Python code. >>> >>> Could we introduce a query hint for this purpose (similar to >>> Broadcast Join Hints)? >>> >>> /*+ *COALESCE*(n, shuffle) */ >>> >>> In general, is query hint is the best way to bring DF functionality >>> to SQL without extending SQL syntax? Any suggestion is highly >>> appreciated. >>> >>> This requirement is not the same as SPARK-6221 that asked for >>> auto-merging output files. >>> >>> Thanks, >>> John Zhuge >>> >> >> > -- John Zhuge
Re: [DISCUSS][SQL] Control the number of output files
https://issues.apache.org/jira/browse/SPARK-24940 The PR has been merged to 2.4.0. On Sun, Aug 5, 2018 at 6:06 PM Koert Kuipers wrote: > lukas, > what is the jira ticket for this? i would like to follow it's activity. > thanks! > koert > > On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec wrote: > >> Hi, >> Yes, This feature is planned - Spark should be soon able to repartition >> output by size. >> Lukas >> >> >> Dne st 25. 7. 2018 23:26 uživatel Forest Fang >> napsal: >> >>> Has there been any discussion to simply support Hive's merge small files >>> configuration? It simply adds one additional stage to inspect size of each >>> output file, recompute the desired parallelism to reach a target size, and >>> runs a map-only coalesce before committing the final files. Since AFAIK >>> SparkSQL already stages the final output commit, it seems feasible to >>> respect this Hive config. >>> >>> >>> https://community.hortonworks.com/questions/106987/hive-multiple-small-files.html >>> >>> >>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra >>> wrote: >>> See some of the related discussion under https://github.com/apache/spark/pull/21589 If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin wrote: > Seems like a good idea in general. Do other systems have similar > concepts? In general it'd be easier if we can follow existing convention > if > there is any. > > > On Wed, Jul 25, 2018 at 11:50 AM John Zhuge wrote: > >> Hi all, >> >> Many Spark users in my company are asking for a way to control the >> number of output files in Spark SQL. There are use cases to either reduce >> or increase the number. The users prefer not to use function >> *repartition*(n) or *coalesce*(n, shuffle) that require them to >> write and deploy Scala/Java/Python code. >> >> Could we introduce a query hint for this purpose (similar to >> Broadcast Join Hints)? >> >> /*+ *COALESCE*(n, shuffle) */ >> >> In general, is query hint is the best way to bring DF functionality >> to SQL without extending SQL syntax? Any suggestion is highly >> appreciated. >> >> This requirement is not the same as SPARK-6221 that asked for >> auto-merging output files. >> >> Thanks, >> John Zhuge >> > > -- John Zhuge
Re: Set up Scala 2.12 test build in Jenkins
It seems to me that the closure cleaner fails to clean up something. The failed test case defines a serializable class inside the test case, and the class doesn't refer to anything in the outer class. Ideally it can be serialized after cleaning up the closure. This is somehow a very weird way to define a class, so I'm not sure how serious the problem is. On Mon, Aug 6, 2018 at 3:41 AM Stavros Kontopoulos < stavros.kontopou...@lightbend.com> wrote: > Makes sense, not sure if closure cleaning is related to the last one for > example or others. The last one is a bit weird, unless I am missing > something about the LegacyAccumulatorWrapper logic. > > Stavros > > On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen wrote: > >> Yep that's what I did. There are more failures with different >> resolutions. I'll open a JIRA and PR and ping you, to make sure that the >> changes are all reasonable, and not an artifact of missing something about >> closure cleaning in 2.12. >> >> In the meantime having a 2.12 build up and running for master will just >> help catch these things. >> >> On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos < >> stavros.kontopou...@lightbend.com> wrote: >> >>> Hi Sean, >>> >>> I run a quick build so the failing tests seem to be: >>> >>> - SPARK-17644: After one stage is aborted for too many failed attempts, >>> subsequent stagesstill behave correctly on fetch failures *** FAILED *** >>> A job with one fetch failure should eventually succeed >>> (DAGSchedulerSuite.scala:2422) >>> >>> >>> - LegacyAccumulatorWrapper with AccumulatorParam that has no >>> equals/hashCode *** FAILED *** >>> java.io.NotSerializableException: >>> org.scalatest.Assertions$AssertionsHelper >>> Serialization stack: >>> - object not serializable (class: >>> org.scalatest.Assertions$AssertionsHelper, value: >>> org.scalatest.Assertions$AssertionsHelper@3bc5fc8f) >>> >>> >>> The last one can be fixed easily if you set class `MyData(val i: Int) >>> extends Serializable `outside of the test suite. For some reason outers >>> (not removed) are capturing >>> the Scalatest stuff in 2.12. >>> >>> Let me know if we see the same failures. >>> >>> Stavros >>> >>> On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen wrote: >>> Shane et al - could we get a test job in Jenkins to test the Scala 2.12 build? I don't think I have the access or expertise for it, though I could probably copy and paste a job. I think we just need to clone the, say, master Maven Hadoop 2.7 job, and add two steps: run "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the profiles that are enabled. I can already see two test failures for the 2.12 build right now and will try to fix those, but this should help verify whether the failures are 'real' and detect them going forward. >>> > > >
Re: [DISCUSS][SQL] Control the number of output files
FYI, the new hints have been merged. They will be available in the upcoming release (Spark 2.4). *John Zhuge*, thanks for your work! Really appreciate it! Please submit more PRs and help the community improve Spark. : ) Xiao 2018-08-05 21:06 GMT-04:00 Koert Kuipers : > lukas, > what is the jira ticket for this? i would like to follow it's activity. > thanks! > koert > > On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec wrote: > >> Hi, >> Yes, This feature is planned - Spark should be soon able to repartition >> output by size. >> Lukas >> >> >> Dne st 25. 7. 2018 23:26 uživatel Forest Fang >> napsal: >> >>> Has there been any discussion to simply support Hive's merge small files >>> configuration? It simply adds one additional stage to inspect size of each >>> output file, recompute the desired parallelism to reach a target size, and >>> runs a map-only coalesce before committing the final files. Since AFAIK >>> SparkSQL already stages the final output commit, it seems feasible to >>> respect this Hive config. >>> >>> https://community.hortonworks.com/questions/106987/hive-mult >>> iple-small-files.html >>> >>> >>> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra >>> wrote: >>> See some of the related discussion under https://github.com/apach e/spark/pull/21589 If feels to me like we need some kind of user code mechanism to signal policy preferences to Spark. This could also include ways to signal scheduling policy, which could include things like scheduling pool and/or barrier scheduling. Some of those scheduling policies operate at inherently different levels currently -- e.g. scheduling pools at the Job level (really, the thread local level in the current implementation) and barrier scheduling at the Stage level -- so it is not completely obvious how to unify all of these policy options/preferences/mechanism, or whether it is possible, but I think it is worth considering such things at a fairly high level of abstraction and try to unify and simplify before making things more complex with multiple policy mechanisms. On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin wrote: > Seems like a good idea in general. Do other systems have similar > concepts? In general it'd be easier if we can follow existing convention > if > there is any. > > > On Wed, Jul 25, 2018 at 11:50 AM John Zhuge wrote: > >> Hi all, >> >> Many Spark users in my company are asking for a way to control the >> number of output files in Spark SQL. There are use cases to either reduce >> or increase the number. The users prefer not to use function >> *repartition*(n) or *coalesce*(n, shuffle) that require them to >> write and deploy Scala/Java/Python code. >> >> Could we introduce a query hint for this purpose (similar to >> Broadcast Join Hints)? >> >> /*+ *COALESCE*(n, shuffle) */ >> >> In general, is query hint is the best way to bring DF functionality >> to SQL without extending SQL syntax? Any suggestion is highly >> appreciated. >> >> This requirement is not the same as SPARK-6221 that asked for >> auto-merging output files. >> >> Thanks, >> John Zhuge >> > >
Re: [DISCUSS][SQL] Control the number of output files
lukas, what is the jira ticket for this? i would like to follow it's activity. thanks! koert On Wed, Jul 25, 2018 at 5:32 PM, lukas nalezenec wrote: > Hi, > Yes, This feature is planned - Spark should be soon able to repartition > output by size. > Lukas > > > Dne st 25. 7. 2018 23:26 uživatel Forest Fang > napsal: > >> Has there been any discussion to simply support Hive's merge small files >> configuration? It simply adds one additional stage to inspect size of each >> output file, recompute the desired parallelism to reach a target size, and >> runs a map-only coalesce before committing the final files. Since AFAIK >> SparkSQL already stages the final output commit, it seems feasible to >> respect this Hive config. >> >> https://community.hortonworks.com/questions/106987/hive- >> multiple-small-files.html >> >> >> On Wed, Jul 25, 2018 at 1:55 PM Mark Hamstra >> wrote: >> >>> See some of the related discussion under https://github.com/ >>> apache/spark/pull/21589 >>> >>> If feels to me like we need some kind of user code mechanism to signal >>> policy preferences to Spark. This could also include ways to signal >>> scheduling policy, which could include things like scheduling pool and/or >>> barrier scheduling. Some of those scheduling policies operate at inherently >>> different levels currently -- e.g. scheduling pools at the Job level >>> (really, the thread local level in the current implementation) and barrier >>> scheduling at the Stage level -- so it is not completely obvious how to >>> unify all of these policy options/preferences/mechanism, or whether it is >>> possible, but I think it is worth considering such things at a fairly high >>> level of abstraction and try to unify and simplify before making things >>> more complex with multiple policy mechanisms. >>> >>> On Wed, Jul 25, 2018 at 1:37 PM Reynold Xin wrote: >>> Seems like a good idea in general. Do other systems have similar concepts? In general it'd be easier if we can follow existing convention if there is any. On Wed, Jul 25, 2018 at 11:50 AM John Zhuge wrote: > Hi all, > > Many Spark users in my company are asking for a way to control the > number of output files in Spark SQL. There are use cases to either reduce > or increase the number. The users prefer not to use function > *repartition*(n) or *coalesce*(n, shuffle) that require them to write > and deploy Scala/Java/Python code. > > Could we introduce a query hint for this purpose (similar to Broadcast > Join Hints)? > > /*+ *COALESCE*(n, shuffle) */ > > In general, is query hint is the best way to bring DF functionality to > SQL without extending SQL syntax? Any suggestion is highly appreciated. > > This requirement is not the same as SPARK-6221 that asked for > auto-merging output files. > > Thanks, > John Zhuge >
Re: Why is SQLImplicits an abstract class rather than a trait?
Hi Assaf, No idea (and don't remember I've ever wondered about it before), but why not doing this (untested): trait MySparkTestTrait { lazy val spark: SparkSession = SparkSession.builder().getOrCreate() // <-- you sure you don't need master? import spark.implicits._ } Wouldn't that import work? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Sun, Aug 5, 2018 at 5:34 PM, assaf.mendelson wrote: > Hi all, > > I have been playing a bit with SQLImplicits and noticed that it is an > abstract class. I was wondering why is that? It has no constructor. > > Because of it being an abstract class it means that adding a test trait > cannot extend it and still be a trait. > > Consider the following: > > trait MySparkTestTrait extends SQLImplicits { > lazy val spark: SparkSession = SparkSession.builder().getOrCreate() > protected override def _sqlContext: SQLContext = spark.sqlContext > } > > > This would mean that if I can do something like this: > > > class MyTestClass extends FunSuite with MySparkTestTrait { > test("SomeTest") { > // use spark implicits without needing to do import > spark.implicits._ > } > } > > Is there a reason for this being an abstract class? > > > > -- > Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >
Re: Set up Scala 2.12 test build in Jenkins
Makes sense, not sure if closure cleaning is related to the last one for example or others. The last one is a bit weird, unless I am missing something about the LegacyAccumulatorWrapper logic. Stavros On Sun, Aug 5, 2018 at 10:23 PM, Sean Owen wrote: > Yep that's what I did. There are more failures with different resolutions. > I'll open a JIRA and PR and ping you, to make sure that the changes are all > reasonable, and not an artifact of missing something about closure cleaning > in 2.12. > > In the meantime having a 2.12 build up and running for master will just > help catch these things. > > On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos lightbend.com> wrote: > >> Hi Sean, >> >> I run a quick build so the failing tests seem to be: >> >> - SPARK-17644: After one stage is aborted for too many failed attempts, >> subsequent stagesstill behave correctly on fetch failures *** FAILED *** >> A job with one fetch failure should eventually succeed >> (DAGSchedulerSuite.scala:2422) >> >> >> - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode >> *** FAILED *** >> java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper >> Serialization stack: >> - object not serializable (class: >> org.scalatest.Assertions$AssertionsHelper, value: >> org.scalatest.Assertions$AssertionsHelper@3bc5fc8f) >> >> >> The last one can be fixed easily if you set class `MyData(val i: Int) >> extends Serializable `outside of the test suite. For some reason outers >> (not removed) are capturing >> the Scalatest stuff in 2.12. >> >> Let me know if we see the same failures. >> >> Stavros >> >> On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen wrote: >> >>> Shane et al - could we get a test job in Jenkins to test the Scala 2.12 >>> build? I don't think I have the access or expertise for it, though I could >>> probably copy and paste a job. I think we just need to clone the, say, >>> master Maven Hadoop 2.7 job, and add two steps: run >>> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the >>> profiles that are enabled. >>> >>> I can already see two test failures for the 2.12 build right now and >>> will try to fix those, but this should help verify whether the failures are >>> 'real' and detect them going forward. >>> >>> >>> >>
Re: Set up Scala 2.12 test build in Jenkins
Yep that's what I did. There are more failures with different resolutions. I'll open a JIRA and PR and ping you, to make sure that the changes are all reasonable, and not an artifact of missing something about closure cleaning in 2.12. In the meantime having a 2.12 build up and running for master will just help catch these things. On Sun, Aug 5, 2018 at 2:16 PM Stavros Kontopoulos < stavros.kontopou...@lightbend.com> wrote: > Hi Sean, > > I run a quick build so the failing tests seem to be: > > - SPARK-17644: After one stage is aborted for too many failed attempts, > subsequent stagesstill behave correctly on fetch failures *** FAILED *** > A job with one fetch failure should eventually succeed > (DAGSchedulerSuite.scala:2422) > > > - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode > *** FAILED *** > java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper > Serialization stack: > - object not serializable (class: > org.scalatest.Assertions$AssertionsHelper, value: > org.scalatest.Assertions$AssertionsHelper@3bc5fc8f) > > > The last one can be fixed easily if you set class `MyData(val i: Int) > extends Serializable `outside of the test suite. For some reason outers > (not removed) are capturing > the Scalatest stuff in 2.12. > > Let me know if we see the same failures. > > Stavros > > On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen wrote: > >> Shane et al - could we get a test job in Jenkins to test the Scala 2.12 >> build? I don't think I have the access or expertise for it, though I could >> probably copy and paste a job. I think we just need to clone the, say, >> master Maven Hadoop 2.7 job, and add two steps: run >> "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the >> profiles that are enabled. >> >> I can already see two test failures for the 2.12 build right now and will >> try to fix those, but this should help verify whether the failures are >> 'real' and detect them going forward. >> >> >> >
Re: Set up Scala 2.12 test build in Jenkins
Hi Sean, I run a quick build so the failing tests seem to be: - SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stagesstill behave correctly on fetch failures *** FAILED *** A job with one fetch failure should eventually succeed (DAGSchedulerSuite.scala:2422) - LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode *** FAILED *** java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper Serialization stack: - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@3bc5fc8f) The last one can be fixed easily if you set class `MyData(val i: Int) extends Serializable `outside of the test suite. For some reason outers (not removed) are capturing the Scalatest stuff in 2.12. Let me know if we see the same failures. Stavros On Sun, Aug 5, 2018 at 5:10 PM, Sean Owen wrote: > Shane et al - could we get a test job in Jenkins to test the Scala 2.12 > build? I don't think I have the access or expertise for it, though I could > probably copy and paste a job. I think we just need to clone the, say, > master Maven Hadoop 2.7 job, and add two steps: run > "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the > profiles that are enabled. > > I can already see two test failures for the 2.12 build right now and will > try to fix those, but this should help verify whether the failures are > 'real' and detect them going forward. > > >
Why is SQLImplicits an abstract class rather than a trait?
Hi all, I have been playing a bit with SQLImplicits and noticed that it is an abstract class. I was wondering why is that? It has no constructor. Because of it being an abstract class it means that adding a test trait cannot extend it and still be a trait. Consider the following: trait MySparkTestTrait extends SQLImplicits { lazy val spark: SparkSession = SparkSession.builder().getOrCreate() protected override def _sqlContext: SQLContext = spark.sqlContext } This would mean that if I can do something like this: class MyTestClass extends FunSuite with MySparkTestTrait { test("SomeTest") { // use spark implicits without needing to do import spark.implicits._ } } Is there a reason for this being an abstract class? -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Set up Scala 2.12 test build in Jenkins
Shane et al - could we get a test job in Jenkins to test the Scala 2.12 build? I don't think I have the access or expertise for it, though I could probably copy and paste a job. I think we just need to clone the, say, master Maven Hadoop 2.7 job, and add two steps: run "./dev/change-scala-version.sh 2.12" first, then add "-Pscala-2.12" to the profiles that are enabled. I can already see two test failures for the 2.12 build right now and will try to fix those, but this should help verify whether the failures are 'real' and detect them going forward.
Re: Am I crazy, or does the binary distro not have Kafka integration?
Yes it's a resaonable argument, that putting N more external integration modules on the default spark-submit classpath might bring in more third-party dependencies that clash or something. I think the convenience factor isn't a big deal; users can also just write a dependence on said module in their own app, once. It does seem like we could at least *ship* the binary bits in "external-jars/' or something; they're not even compiled in the binary distro. And it also means users have to make sure the version of spark-kafka they integrate works with their cluster, which means not just making sure their app matches the user-facing API of spark-kafka, but ensuring that the spark-kafka module's interface to spark works -- whatever internal details there may be there. On Sat, Aug 4, 2018 at 9:15 PM Matei Zaharia wrote: > I think that traditionally, the reason *not* to include these has been if > they brought additional dependencies that users don’t really need, but that > might clash with what the users have in their own app. Maybe this used to > be the case for Kafka. We could analyze it and include it by default, or > perhaps make it easier to add it in spark-submit and spark-shell. I feel > that in an IDE, it won’t be a huge problem because you just add it once, > but it is annoying for spark-submit. > > Matei > > > On Aug 4, 2018, at 2:19 PM, Sean Owen wrote: > > > > Hm OK I am crazy then. I think I never noticed it because I had always > used a distro that did actually supply this on the classpath. > > Well ... I think it would be reasonable to include these things (at > least, Kafka integration) by default in the binary distro. I'll update the > JIRA to reflect that this is at best a Wish. > > > > On Sat, Aug 4, 2018 at 4:17 PM Jacek Laskowski wrote: > > Hi Sean, > > > > It's been for years I'd say that you had to specify --packages to get > the Kafka-related jars on the classpath. I simply got used to this > annoyance (as did others). Could it be that it's an external package > (although an integral part of Spark)?! > > > > I'm very glad you've brought it up since I think Kafka data source is so > important that it should be included in spark-shell and spark-submit by > default. THANKS! > > > > Pozdrawiam, > > Jacek Laskowski > > > > https://about.me/JacekLaskowski > > Mastering Spark SQL https://bit.ly/mastering-spark-sql > > Spark Structured Streaming https://bit.ly/spark-structured-streaming > > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams > > Follow me at https://twitter.com/jaceklaskowski > > > > On Sat, Aug 4, 2018 at 9:56 PM, Sean Owen wrote: > > Let's take this to https://issues.apache.org/jira/browse/SPARK-25026 -- > I provisionally marked this a Blocker, as if it's correct, then the release > is missing an important piece and we'll want to remedy that ASAP. I still > have this feeling I am missing something. The classes really aren't there > in the release but ... *nobody* noticed all this time? I guess maybe > Spark-Kafka users may be using a vendor distro that does package these bits. > > > > > > On Sat, Aug 4, 2018 at 10:48 AM Sean Owen wrote: > > I was debugging why a Kafka-based streaming app doesn't seem to find > Kafka-related integration classes when run standalone from our latest 2.3.1 > release, and noticed that there doesn't seem to be any Kafka-related jars > from Spark in the distro. In jars/, I see: > > > > spark-catalyst_2.11-2.3.1.jar > > spark-core_2.11-2.3.1.jar > > spark-graphx_2.11-2.3.1.jar > > spark-hive-thriftserver_2.11-2.3.1.jar > > spark-hive_2.11-2.3.1.jar > > spark-kubernetes_2.11-2.3.1.jar > > spark-kvstore_2.11-2.3.1.jar > > spark-launcher_2.11-2.3.1.jar > > spark-mesos_2.11-2.3.1.jar > > spark-mllib-local_2.11-2.3.1.jar > > spark-mllib_2.11-2.3.1.jar > > spark-network-common_2.11-2.3.1.jar > > spark-network-shuffle_2.11-2.3.1.jar > > spark-repl_2.11-2.3.1.jar > > spark-sketch_2.11-2.3.1.jar > > spark-sql_2.11-2.3.1.jar > > spark-streaming_2.11-2.3.1.jar > > spark-tags_2.11-2.3.1.jar > > spark-unsafe_2.11-2.3.1.jar > > spark-yarn_2.11-2.3.1.jar > > > > I checked make-distribution.sh, and it copies a bunch of JARs into the > distro, but does not seem to touch the kafka modules. > > > > Am I crazy or missing something obvious -- those should be in the > release, right? > > > >
Re: [Proposal] New feature: reconfigurable number of partitions on stateful operators in Structured Streaming
"coalesce" looks like working: I misunderstood it as an efficient version of "repartition" which does shuffle, so expected it would trigger shuffle. My proposal would be covered as using "coalesce": thanks Joseph for correction. Let me abandon the proposal. We may still miss for now is documentation for the fact: the number of partitions for states cannot be changed, so Spark restricts to modify "spark.sql.shuffle.partitions" once the query is run (only applying to streaming query, right?). If end users want to have more or less number of state partitions, the value should be set before running the query at the first time. Would it be better to add this to "Structured Streaming" doc? I agree that decoupling state and partitions would not be simple. I'd try out (offline) repartition first if the number of partitions would be really matter for scalability / elasticity. Thanks, Jungtaek Lim (HeartSaVioR) 2018년 8월 4일 (토) 오전 3:10, Joseph Torres 님이 작성: > I'd agree it might make sense to bundle this into an API. We'd have to > think about whether it's a common enough use case to justify the API > complexity. > > It might be worth exploring decoupling state and partitions, but I > wouldn't want to start making decisions based on it without a clearer > design picture. I would expect the decoupling to make it very difficult to > ensure idempotency of state updates. > > Jose > > On Fri, Aug 3, 2018 at 10:55 AM, Arun Mahadevan wrote: > >> coalesce might work. >> >> Say "spark.sql.shuffle.partitions" = 200, and then " >> input.readStream.map.filter.groupByKey(..).coalesce(2)" would still >> create 200 instances for state but execute just 2 tasks. >> >> However I think further groupByKey operations downstream would need >> similar coalesce. >> >> And this is assuming the user sets the right shuffle partitions upfront. >> >> It maybe worth to bundle this pattern as some builtin api so that it can >> be transparent to the user. I am not sure how were you planning to expose >> the state key groups at api level and if it would be transparent. >> >> IMO, decoupling the state and partitions and making it key based would >> still be worth exploring to support dynamic state rebalancing. May be the >> default HDFS based implementation can maintain the state partition wise and >> not support it, but there could be implementations based on distributed k-v >> store which supports this. >> >> Thanks, >> Arun >> >> >> On 3 August 2018 at 08:21, Joseph Torres >> wrote: >> >>> A coalesced RDD will definitely maintain any within-partition invariants >>> that the original RDD maintained. It pretty much just runs its input >>> partitions sequentially. >>> >>> There'd still be some Dataframe API work needed to get the coalesce >>> operation where you want it to be, but this is much simpler than >>> introducing a new concept of state key groups. As far as I can tell, >>> state key groups are just the same thing that we currently call partitions >>> of the aggregate RDD. >>> >>> On Fri, Aug 3, 2018 at 8:01 AM, Jungtaek Lim wrote: >>> I’m afraid I don’t know about the details on coalesce(), but some finding resource for coalesce, it looks like helping reducing actual partitions. For streaming aggregation, state for all partitions (by default, 200) must be initialized and committed even it is being unchanged. Otherwise error occurred when reading a partition which is excluded in query previously. Moreover, it can’t find existing row from state or store row in wrong partition if partition id doesn’t match the expected id via hashing function. Could you verify coalesce() meets such requirements? On Fri, 3 Aug 2018 at 22:23 Joseph Torres wrote: > Scheduling multiple partitions in the same task is basically what > coalesce() does. Is there a reason that doesn't work here? > > On Fri, Aug 3, 2018 at 5:55 AM, Jungtaek Lim > wrote: > >> Here's a link for Google docs (anyone can comment): >> >> https://docs.google.com/document/d/1DEOW3WQcPUq0YFgazkZx6Ei6EOdj_3pXEsyq4LGpyNs/edit?usp=sharing >> >> Please note that I just copied the content to the google docs, so >> someone could point out lack of details. I would like to start with >> explanation of the concept, and once we are in agreement on going >> forward, >> I could add more detail in doc, or even just start working and detail can >> be shared with POC code or even WIP patch. >> >> Answer inlined for Arun's comments: >> >> 2018년 8월 3일 (금) 오후 5:39, Arun Mahadevan 님이 작성: >> >>> Can you share this in a google doc to make the discussions easier.? >>> >>> >>> >>> Thanks for coming up with ideas to improve upon the current >>> restrictions with the SS state store. >>> >>> >>> >>> If I understood correctly, the plan is to introduce a logical >>> partitioning scheme for state storage