[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic
[ https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813195#comment-16813195 ] Valeria Vasylieva commented on SPARK-20597: --- [~jlaskowski] kindly ask you to review the PR, so that this task would not hang on. > KafkaSourceProvider falls back on path as synonym for topic > --- > > Key: SPARK-20597 > URL: https://issues.apache.org/jira/browse/SPARK-20597 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Trivial > Labels: starter > > # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic > to save a DataFrame's rows to > # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka > topics for writing > What seems a quite interesting option is to support {{start(path: String)}} > as the least precedence option in which {{path}} would designate the default > topic when no other options are used. > {code} > df.writeStream.format("kafka").start("topic") > {code} > See > http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html > for discussion -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089 ] Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:22 PM: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment seems to work in spark streaming. It has an overhead of creating an array, but it resolves the current problem. {code:java} df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*"){code} was (Author: nimfadora): [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15547358&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15547358] comment with seems to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. {code:java} df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*"){code} > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089 ] Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:15 PM: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15547358&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15547358] comment with seems to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. {code:java} df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*"){code} was (Author: nimfadora): [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment with seems to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. {code:java} df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*"){code} > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089 ] Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:14 PM: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment with seems to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. {code:java} df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*"){code} was (Author: nimfadora): [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment with explode(array(myFunc($id))) semms to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*") > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089 ] Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:14 PM: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment with explode(array(myFunc($id))) semms to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. df.select(explode(array(myUdf($"value"))).as("parsed_struct")) .selectExpr("parsed_struct.*") was (Author: nimfadora): [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023] comment with explode(array(myFunc($id))) semms to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089 ] Valeria Vasylieva commented on SPARK-18748: --- [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023] comment with explode(array(myFunc($id))) semms to work in spark streaming. It have the overhead of creating an array, but it resolves the current problem. > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valeria Vasylieva updated SPARK-18748: -- Comment: was deleted (was: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023] comment with explode(array(myFunc($id))) works in spark streaming. It have the overhead of creating an array, but it resolves the current problem.) > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance
[ https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799077#comment-16799077 ] Valeria Vasylieva commented on SPARK-18748: --- [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in [this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023] comment with explode(array(myFunc($id))) works in spark streaming. It have the overhead of creating an array, but it resolves the current problem. > UDF multiple evaluations causes very poor performance > - > > Key: SPARK-18748 > URL: https://issues.apache.org/jira/browse/SPARK-18748 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Ohad Raviv >Priority: Major > > We have a use case where we have a relatively expensive UDF that needs to be > calculated. The problem is that instead of being calculated once, it gets > calculated over and over again. > for example: > {quote} > def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\} > hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _) > hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is > not null and c<>''").show > {quote} > with the output: > {quote} > blahblah1 > blahblah1 > blahblah1 > +---+ > | c| > +---+ > |nothing| > +---+ > {quote} > You can see that for each reference of column "c" you will get the println. > that causes very poor performance for our real use case. > This also came out on StackOverflow: > http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns > http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/ > with two problematic work-arounds: > 1. cache() after the first time. e.g. > {quote} > hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not > null and c<>''").show > {quote} > while it works, in our case we can't do that because the table is too big to > cache. > 2. move back and forth to rdd: > {quote} > val df = hiveContext.sql("select veryExpensiveCalc('a') as c") > hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and > c<>''").show > {quote} > which works but then we loose some of the optimizations like push down > predicate features, etc. and its very ugly. > Any ideas on how we can make the UDF get calculated just once in a reasonable > way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24299) UI for Structured Streaming Monitoring
[ https://issues.apache.org/jira/browse/SPARK-24299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799013#comment-16799013 ] Valeria Vasylieva commented on SPARK-24299: --- [~TomaszGaweda] I want to try to work on this task > UI for Structured Streaming Monitoring > -- > > Key: SPARK-24299 > URL: https://issues.apache.org/jira/browse/SPARK-24299 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming, Web UI >Affects Versions: 2.3.0 >Reporter: Tomasz Gawęda >Priority: Major > > Old Streaming module had a helpful Web UI for monitoring. It would be very > helpful to have such UI for Structured Streaming > > I'm aware that we can read a lot from SQL visualization, but still dedicated > page with metrics for queries will be useful. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26739) Standardized Join Types for DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16796045#comment-16796045 ] Valeria Vasylieva commented on SPARK-26739: --- [~slehan] hi! I strongly agree this change will make Dataset/DataFrame API users much happier, me at least. I would like to work on this task. There are several questions though: 1) Will this change be accepted, if implemented? 2) If yes, do we need to mark methods using strings as deprecated? [~hyukjin.kwon], [~r...@databricks.com], [~cloud_fan] could one of you please answer these questions? Thank you! > Standardized Join Types for DataFrames > -- > > Key: SPARK-26739 > URL: https://issues.apache.org/jira/browse/SPARK-26739 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Skyler Lehan >Priority: Minor > Original Estimate: 48h > Remaining Estimate: 48h > > h3. *Q1.* What are you trying to do? Articulate your objectives using > absolutely no jargon. > Currently, in the join functions on > [DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset], > the join types are defined via a string parameter called joinType. In order > for a developer to know which joins are possible, they must look up the API > call for join. While this works fine, it can cause the developer to make a > typo resulting in improper joins and/or unexpected errors that aren't evident > at compile time. The objective of this improvement would be to allow > developers to use a common definition for join types (by enum or constants) > called JoinTypes. This would contain the possible joins and remove the > possibility of a typo. It would also allow Spark to alter the names of the > joins in the future without impacting end-users. > h3. *Q2.* What problem is this proposal NOT designed to solve? > The problem this solves is extremely narrow, it would not solve anything > other than providing a common definition for join types. > h3. *Q3.* How is it done today, and what are the limits of current practice? > Currently, developers must join two DataFrames like so: > {code:java} > val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), > "left_outer") > {code} > Where they manually type the join type. As stated above, this: > * Requires developers to manually type in the join > * Can cause possibility of typos > * Restricts renaming of join types as its a literal string > * Does not restrict and/or compile check the join type being used, leading > to runtime errors > h3. *Q4.* What is new in your approach and why do you think it will be > successful? > The new approach would use constants or *more preferably an enum*, something > like this: > {code:java} > val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), > JoinType.LEFT_OUTER) > {code} > This would provide: > * In code reference/definitions of the possible join types > ** This subsequently allows the addition of scaladoc of what each join type > does and how it operates > * Removes possibilities of a typo on the join type > * Provides compile time checking of the join type (only if an enum is used) > To clarify, if JoinType is a constant, it would just fill in the joinType > string parameter for users. If an enum is used, it would restrict the domain > of possible join types to whatever is defined in the future JoinType enum. > The enum is preferred, however it would take longer to implement. > h3. *Q5.* Who cares? If you are successful, what difference will it make? > Developers using Apache Spark will care. This will make the join function > easier to wield and lead to less runtime errors. It will save time by > bringing join type validation at compile time. It will also provide in code > reference to the join types, which saves the developer time of having to look > up and navigate the multiple join functions to find the possible join types. > In addition to that, the resulting constants/enum would have documentation on > how that join type works. > h3. *Q6.* What are the risks? > Users of Apache Spark who currently use strings to define their join types > could be impacted if an enum is chosen as the common definition. This risk > can be mitigated by using string constants. The string constants would be the > exact same string as the string literals used today. For example: > {code:java} > JoinType.INNER = "inner" > {code} > If an enum is still the preferred way of defining the join types, new join > functions could be added that take in these enums and the join calls that > contain string parameters for joinType could be deprecated. This would give > developers a chance to change over to the new join types. > h3. *Q7.* How long will
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:59 AM: --- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_b
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 9:01 AM: --- I have investigated this issue and here what I have found: exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved. When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new Banana()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new Banana()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new Banana()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new Banana()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_bananas_with_method_reference() { > JavaRDD appleRdd = toRdd(
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 9:00 AM: --- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new Banana()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new Banana()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_bananas_with_method_reference() { >
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:58 AM: --- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asLi
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:58 AM: --- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_
[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:57 AM: --- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]]. And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. was (Author: nimfadora): I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]:],] that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].] And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T .
[jira] [Commented] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ] Valeria Vasylieva commented on SPARK-9135: -- I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]:],] that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].] And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_bananas_with_method_reference() { > JavaRDD appleRdd = toRdd(new Apple()); > JavaRDD bananaRdd = toRdd(new Banana()); > > long redAppleCount = appleRdd.filter(Fruit::isRed).count(); > long redBananaCount = bananaRdd.filter(Fruit::isRed).count(); > assertThat(redAppleCount, equalTo(1L)); > assertThat(redBananaCount, equalTo(0L)); > } > } > {code} > The test above throws: > {code} > 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) > java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast > to com.doggybites.SparkTest$Apple > at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) > at org.apache.spark.rdd.RDD$$anonf
[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic
[ https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775279#comment-16775279 ] Valeria Vasylieva commented on SPARK-20597: --- [~jlaskowski] I have added the PR for this issue, could you please look at it? Thank you. > KafkaSourceProvider falls back on path as synonym for topic > --- > > Key: SPARK-20597 > URL: https://issues.apache.org/jira/browse/SPARK-20597 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Trivial > Labels: starter > > # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic > to save a DataFrame's rows to > # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka > topics for writing > What seems a quite interesting option is to support {{start(path: String)}} > as the least precedence option in which {{path}} would designate the default > topic when no other options are used. > {code} > df.writeStream.format("kafka").start("topic") > {code} > See > http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html > for discussion -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method
[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773067#comment-16773067 ] Valeria Vasylieva commented on SPARK-9135: -- I would like to work on it > Filter fails when filtering with a method reference to overloaded method > > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API >Affects Versions: 1.4.0 >Reporter: Mateusz Michalowski >Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private JavaRDD toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_bananas_with_method_reference() { > JavaRDD appleRdd = toRdd(new Apple()); > JavaRDD bananaRdd = toRdd(new Banana()); > > long redAppleCount = appleRdd.filter(Fruit::isRed).count(); > long redBananaCount = bananaRdd.filter(Fruit::isRed).count(); > assertThat(redAppleCount, equalTo(1L)); > assertThat(redBananaCount, equalTo(0L)); > } > } > {code} > The test above throws: > {code} > 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) > java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast > to com.doggybites.SparkTest$Apple > at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/07/17 14:10:04 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, > localhost): java.lang.ClassCastException: com.doggybites.SparkTest$Banana > cannot be cast to com.doggybites.SparkTest$Apple > at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scal
[jira] [Commented] (SPARK-25810) Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest
[ https://issues.apache.org/jira/browse/SPARK-25810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773044#comment-16773044 ] Valeria Vasylieva commented on SPARK-25810: --- I suppose the cause is here: [KafkaSourceProvider:521|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L517] {code:java} // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial // offsets by itself instead of counting on KafkaConsumer. .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") {code} But I do not really think that it should be fixed, as Spark defines custom algorithm for offset checking and fetching in [KafkaSource|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala], thanks to it we can avoid Kafka errors on some offsets that do not exist etc. [ConsumerConfig|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java] belongs to Kafka, so we cannot just change the logging here. > Spark structured streaming logs auto.offset.reset=earliest even though > startingOffsets is set to latest > --- > > Key: SPARK-25810 > URL: https://issues.apache.org/jira/browse/SPARK-25810 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: ANUJA BANTHIYA >Priority: Trivial > > I have a issue when i'm trying to read data from kafka using spark > structured streaming. > Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, > spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0 > The issue i am facing is that the spark job always logs auto.offset.reset = > earliest even though latest option is specified in the code during startup > of application . > Code to reproduce: > {code:java} > package com.informatica.exec > import org.apache.spark.sql.SparkSession > object kafkaLatestOffset { > def main(s: Array[String]) { > val spark = SparkSession > .builder() > .appName("Spark Offset basic example") > .master("local[*]") > .getOrCreate() > val df = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "topic1") > .option("startingOffsets", "latest") > .load() > val query = df.writeStream > .outputMode("complete") > .format("console") > .start() > query.awaitTermination() > } > } > {code} > > As mentioned in Structured streaming doc, {{startingOffsets}} need to be set > for auto.offset.reset. > [https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html] > * *auto.offset.reset*: Set the source option {{startingOffsets}} to specify > where to start instead. Structured Streaming manages which offsets are > consumed internally, rather than rely on the kafka Consumer to do it. This > will ensure that no data is missed when new topics/partitions are dynamically > subscribed. Note that {{startingOffsets}} only applies when a new streaming > query is started, and that resuming will always pick up from where the query > left off. > During runtime , kafka messages are picked from the latest offset , so > functional wise it is working as expected. Only log is misleading as it logs > auto.offset.reset = *earliest* . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26869) UDF with struct requires to have _1 and _2 as struct field names
[ https://issues.apache.org/jira/browse/SPARK-26869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16772988#comment-16772988 ] Valeria Vasylieva commented on SPARK-26869: --- [~anddonram] you are trying to treat Struct as Tuple in udf, but even if you try to use Row/case class, it will also fail as it is not supported yet. Try to look at [SPARK-12823|https://issues.apache.org/jira/browse/SPARK-12823], it seems to be related. Hope it helps. > UDF with struct requires to have _1 and _2 as struct field names > > > Key: SPARK-26869 > URL: https://issues.apache.org/jira/browse/SPARK-26869 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 > Environment: Ubuntu 18.04.1 LTS >Reporter: Andrés Doncel Ramírez >Priority: Minor > > When using a UDF which has a Seq of tuples as input, the struct field names > need to match "_1" and "_2". The following code illustrates this: > > {code:java} > val df = sc.parallelize(Array( > ("1",3.0), > ("2",4.5), > ("5",2.0) > ) > ).toDF("c1","c2") > val df1=df.agg(collect_list(struct("c1","c2")).as("c3")) > // Changing column names to _1 and _2 when creating the struct > val > df2=df.agg(collect_list(struct(col("c1").as("_1"),col("c2").as("_2"))).as("c3")) > def takeUDF = udf({ (xs: Seq[(String, Double)]) => > xs.take(2) > }) > df1.printSchema > df2.printSchema > df1.withColumn("c4",takeUDF(col("c3"))).show() // this fails > df2.withColumn("c4",takeUDF(col("c3"))).show() // this works > {code} > The first one returns the following exception: > org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(c3)' due to data > type mismatch: argument 1 requires array> type, > however, '`c3`' is of array> type.;; > While the second works as expected and prints the result. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic
[ https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valeria Vasylieva updated SPARK-20597: -- Attachment: Jacek Laskowski.url > KafkaSourceProvider falls back on path as synonym for topic > --- > > Key: SPARK-20597 > URL: https://issues.apache.org/jira/browse/SPARK-20597 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Trivial > Labels: starter > > # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic > to save a DataFrame's rows to > # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka > topics for writing > What seems a quite interesting option is to support {{start(path: String)}} > as the least precedence option in which {{path}} would designate the default > topic when no other options are used. > {code} > df.writeStream.format("kafka").start("topic") > {code} > See > http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html > for discussion -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic
[ https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765060#comment-16765060 ] Valeria Vasylieva commented on SPARK-20597: --- Hi! [~Satyajit] are you still working on this task? If no, [~jlaskowski] can I give it a try? > KafkaSourceProvider falls back on path as synonym for topic > --- > > Key: SPARK-20597 > URL: https://issues.apache.org/jira/browse/SPARK-20597 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Trivial > Labels: starter > > # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic > to save a DataFrame's rows to > # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka > topics for writing > What seems a quite interesting option is to support {{start(path: String)}} > as the least precedence option in which {{path}} would designate the default > topic when no other options are used. > {code} > df.writeStream.format("kafka").start("topic") > {code} > See > http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html > for discussion -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic
[ https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valeria Vasylieva updated SPARK-20597: -- Attachment: (was: Jacek Laskowski.url) > KafkaSourceProvider falls back on path as synonym for topic > --- > > Key: SPARK-20597 > URL: https://issues.apache.org/jira/browse/SPARK-20597 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Trivial > Labels: starter > > # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic > to save a DataFrame's rows to > # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka > topics for writing > What seems a quite interesting option is to support {{start(path: String)}} > as the least precedence option in which {{path}} would designate the default > topic when no other options are used. > {code} > df.writeStream.format("kafka").start("topic") > {code} > See > http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html > for discussion -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org