[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic

2019-04-09 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813195#comment-16813195
 ] 

Valeria Vasylieva commented on SPARK-20597:
---

[~jlaskowski] kindly ask you to review the PR, so that this task would not hang 
on.

> KafkaSourceProvider falls back on path as synonym for topic
> ---
>
> Key: SPARK-20597
> URL: https://issues.apache.org/jira/browse/SPARK-20597
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: starter
>
> # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic 
> to save a DataFrame's rows to
> # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka 
> topics for writing
> What seems a quite interesting option is to support {{start(path: String)}} 
> as the least precedence option in which {{path}} would designate the default 
> topic when no other options are used.
> {code}
> df.writeStream.format("kafka").start("topic")
> {code}
> See 
> http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html
>  for discussion



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089
 ] 

Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:22 PM:


[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment 
seems to work in spark streaming. It has an overhead of creating an array, but 
it resolves the current problem.
{code:java}
df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*"){code}


was (Author: nimfadora):
[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15547358&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15547358]
 comment with seems to work in spark streaming. It have the overhead of 
creating an array, but it resolves the current problem.
{code:java}
df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*"){code}

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089
 ] 

Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:15 PM:


[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15547358&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15547358]
 comment with seems to work in spark streaming. It have the overhead of 
creating an array, but it resolves the current problem.
{code:java}
df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*"){code}


was (Author: nimfadora):
[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment 
with seems to work in spark streaming. It have the overhead of creating an 
array, but it resolves the current problem.
{code:java}
df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*"){code}

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089
 ] 

Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:14 PM:


[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment 
with seems to work in spark streaming. It have the overhead of creating an 
array, but it resolves the current problem.
{code:java}
df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*"){code}


was (Author: nimfadora):
[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment 
with explode(array(myFunc($id))) semms to work in spark streaming. It have the 
overhead of creating an array, but it resolves the current problem.

df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*")

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089
 ] 

Valeria Vasylieva edited comment on SPARK-18748 at 3/22/19 3:14 PM:


[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in this comment 
with explode(array(myFunc($id))) semms to work in spark streaming. It have the 
overhead of creating an array, but it resolves the current problem.

df.select(explode(array(myUdf($"value"))).as("parsed_struct"))
 .selectExpr("parsed_struct.*")


was (Author: nimfadora):
[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023]
 comment with explode(array(myFunc($id))) semms to work in spark streaming. It 
have the overhead of creating an array, but it resolves the current problem.

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799089#comment-16799089
 ] 

Valeria Vasylieva commented on SPARK-18748:
---

[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023]
 comment with explode(array(myFunc($id))) semms to work in spark streaming. It 
have the overhead of creating an array, but it resolves the current problem.

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva updated SPARK-18748:
--
Comment: was deleted

(was: [~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023]
 comment with explode(array(myFunc($id))) works in spark streaming. It have the 
overhead of creating an array, but it resolves the current problem.)

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18748) UDF multiple evaluations causes very poor performance

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799077#comment-16799077
 ] 

Valeria Vasylieva commented on SPARK-18748:
---

[~uzadude] [~hqb1989] workaround, described in -SPARK-17728- in 
[this|https://issues.apache.org/jira/browse/SPARK-17728?focusedCommentId=15535023&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15535023]
 comment with explode(array(myFunc($id))) works in spark streaming. It have the 
overhead of creating an array, but it resolves the current problem.

> UDF multiple evaluations causes very poor performance
> -
>
> Key: SPARK-18748
> URL: https://issues.apache.org/jira/browse/SPARK-18748
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> We have a use case where we have a relatively expensive UDF that needs to be 
> calculated. The problem is that instead of being calculated once, it gets 
> calculated over and over again.
> for example:
> {quote}
> def veryExpensiveCalc(str:String) = \{println("blahblah1"); "nothing"\}
> hiveContext.udf.register("veryExpensiveCalc", veryExpensiveCalc _)
> hiveContext.sql("select * from (select veryExpensiveCalc('a') c)z where c is 
> not null and c<>''").show
> {quote}
> with the output:
> {quote}
> blahblah1
> blahblah1
> blahblah1
> +---+
> |  c|
> +---+
> |nothing|
> +---+
> {quote}
> You can see that for each reference of column "c" you will get the println.
> that causes very poor performance for our real use case.
> This also came out on StackOverflow:
> http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns
> http://stackoverflow.com/questions/34587596/trying-to-turn-a-blob-into-multiple-columns-in-spark/
> with two problematic work-arounds:
> 1. cache() after the first time. e.g.
> {quote}
> hiveContext.sql("select veryExpensiveCalc('a') as c").cache().where("c is not 
> null and c<>''").show
> {quote}
> while it works, in our case we can't do that because the table is too big to 
> cache.
> 2. move back and forth to rdd:
> {quote}
> val df = hiveContext.sql("select veryExpensiveCalc('a') as c")
> hiveContext.createDataFrame(df.rdd, df.schema).where("c is not null and 
> c<>''").show
> {quote}
> which works but then we loose some of the optimizations like push down 
> predicate features, etc. and its very ugly.
> Any ideas on how we can make the UDF get calculated just once in a reasonable 
> way?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24299) UI for Structured Streaming Monitoring

2019-03-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16799013#comment-16799013
 ] 

Valeria Vasylieva commented on SPARK-24299:
---

[~TomaszGaweda] I want to try to work on this task

> UI for Structured Streaming Monitoring
> --
>
> Key: SPARK-24299
> URL: https://issues.apache.org/jira/browse/SPARK-24299
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming, Web UI
>Affects Versions: 2.3.0
>Reporter: Tomasz Gawęda
>Priority: Major
>
> Old Streaming module had a helpful Web UI for monitoring. It would be very 
> helpful to have such UI for Structured Streaming
>  
> I'm aware that we can read a lot from SQL visualization, but still dedicated 
> page with metrics for queries will be useful.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26739) Standardized Join Types for DataFrames

2019-03-19 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16796045#comment-16796045
 ] 

Valeria Vasylieva commented on SPARK-26739:
---

[~slehan] hi! I strongly agree this change will make Dataset/DataFrame API 
users much happier, me at least. 

I would like to work on this task.

 

There are several questions though:

1) Will this change be accepted, if implemented?

2) If yes, do we need to mark methods using strings as deprecated?

 

[~hyukjin.kwon], [~r...@databricks.com], [~cloud_fan] could one of you please 
answer these questions?

Thank you!

> Standardized Join Types for DataFrames
> --
>
> Key: SPARK-26739
> URL: https://issues.apache.org/jira/browse/SPARK-26739
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Skyler Lehan
>Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> h3. *Q1.* What are you trying to do? Articulate your objectives using 
> absolutely no jargon.
> Currently, in the join functions on 
> [DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset],
>  the join types are defined via a string parameter called joinType. In order 
> for a developer to know which joins are possible, they must look up the API 
> call for join. While this works fine, it can cause the developer to make a 
> typo resulting in improper joins and/or unexpected errors that aren't evident 
> at compile time. The objective of this improvement would be to allow 
> developers to use a common definition for join types (by enum or constants) 
> called JoinTypes. This would contain the possible joins and remove the 
> possibility of a typo. It would also allow Spark to alter the names of the 
> joins in the future without impacting end-users.
> h3. *Q2.* What problem is this proposal NOT designed to solve?
> The problem this solves is extremely narrow, it would not solve anything 
> other than providing a common definition for join types.
> h3. *Q3.* How is it done today, and what are the limits of current practice?
> Currently, developers must join two DataFrames like so:
> {code:java}
> val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), 
> "left_outer")
> {code}
> Where they manually type the join type. As stated above, this:
>  * Requires developers to manually type in the join
>  * Can cause possibility of typos
>  * Restricts renaming of join types as its a literal string
>  * Does not restrict and/or compile check the join type being used, leading 
> to runtime errors
> h3. *Q4.* What is new in your approach and why do you think it will be 
> successful?
> The new approach would use constants or *more preferably an enum*, something 
> like this:
> {code:java}
> val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), 
> JoinType.LEFT_OUTER)
> {code}
> This would provide:
>  * In code reference/definitions of the possible join types
>  ** This subsequently allows the addition of scaladoc of what each join type 
> does and how it operates
>  * Removes possibilities of a typo on the join type
>  * Provides compile time checking of the join type (only if an enum is used)
> To clarify, if JoinType is a constant, it would just fill in the joinType 
> string parameter for users. If an enum is used, it would restrict the domain 
> of possible join types to whatever is defined in the future JoinType enum. 
> The enum is preferred, however it would take longer to implement.
> h3. *Q5.* Who cares? If you are successful, what difference will it make?
> Developers using Apache Spark will care. This will make the join function 
> easier to wield and lead to less runtime errors. It will save time by 
> bringing join type validation at compile time. It will also provide in code 
> reference to the join types, which saves the developer time of having to look 
> up and navigate the multiple join functions to find the possible join types. 
> In addition to that, the resulting constants/enum would have documentation on 
> how that join type works.
> h3. *Q6.* What are the risks?
> Users of Apache Spark who currently use strings to define their join types 
> could be impacted if an enum is chosen as the common definition. This risk 
> can be mitigated by using string constants. The string constants would be the 
> exact same string as the string literals used today. For example:
> {code:java}
> JoinType.INNER = "inner"
> {code}
> If an enum is still the preferred way of defining the join types, new join 
> functions could be added that take in these enums and the join calls that 
> contain string parameters for joinType could be deprecated. This would give 
> developers a chance to change over to the new join types.
> h3. *Q7.* How long will 

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:59 AM:
---

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_and_b

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 9:01 AM:
---

I have investigated this issue and here what I have found: exception is caused 
by this [JDK bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], 
that is still unresolved.

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new Banana()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new Banana()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new Banana()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new Banana()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_and_bananas_with_method_reference() {
> JavaRDD appleRdd = toRdd(

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 9:00 AM:
---

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new Banana()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new Banana()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_and_bananas_with_method_reference() {
> 

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:58 AM:
---

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]],
 that is still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asLi

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:58 AM:
---

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_

[jira] [Comment Edited] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:57 AM:
---

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]],
 that is still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]:],]
 that is still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].]
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T .

[jira] [Commented] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-28 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva commented on SPARK-9135:
--

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]:],]
 that is still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].]
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_and_bananas_with_method_reference() {
> JavaRDD appleRdd = toRdd(new Apple());
> JavaRDD bananaRdd = toRdd(new Banana());
> 
> long redAppleCount = appleRdd.filter(Fruit::isRed).count();
> long redBananaCount = bananaRdd.filter(Fruit::isRed).count();
> assertThat(redAppleCount, equalTo(1L));
> assertThat(redBananaCount, equalTo(0L));
> }
> }
> {code}
> The test above throws:
> {code}
> 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast 
> to com.doggybites.SparkTest$Apple
>   at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source)
>   at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>   at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
>   at org.apache.spark.rdd.RDD$$anonf

[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic

2019-02-22 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16775279#comment-16775279
 ] 

Valeria Vasylieva commented on SPARK-20597:
---

[~jlaskowski] I have added the PR for this issue, could you please look at it? 
Thank you.

> KafkaSourceProvider falls back on path as synonym for topic
> ---
>
> Key: SPARK-20597
> URL: https://issues.apache.org/jira/browse/SPARK-20597
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: starter
>
> # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic 
> to save a DataFrame's rows to
> # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka 
> topics for writing
> What seems a quite interesting option is to support {{start(path: String)}} 
> as the least precedence option in which {{path}} would designate the default 
> topic when no other options are used.
> {code}
> df.writeStream.format("kafka").start("topic")
> {code}
> See 
> http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html
>  for discussion



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9135) Filter fails when filtering with a method reference to overloaded method

2019-02-20 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773067#comment-16773067
 ] 

Valeria Vasylieva commented on SPARK-9135:
--

I would like to work on it

> Filter fails when filtering with a method reference to overloaded method
> 
>
> Key: SPARK-9135
> URL: https://issues.apache.org/jira/browse/SPARK-9135
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.4.0
>Reporter: Mateusz Michalowski
>Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
> static abstract class Fruit implements Serializable {
> abstract boolean isRed();
> }
> static class Banana extends Fruit {
> @Override
> boolean isRed() {
> return false;
> }
> }
> static class Apple extends Fruit {
> @Override
> boolean isRed() {
> return true;
> }
> }
> private JavaSparkContext sparkContext;
> @Before
> public void setUp() throws Exception {
> SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
> sparkContext = new JavaSparkContext(sparkConf);
> }
> @After
> public void tearDown() throws Exception {
> sparkContext.stop();
> }
> private  JavaRDD toRdd(T ... array) {
> return sparkContext.parallelize(Arrays.asList(array));
> }
> @Test
> public void filters_apples_and_bananas_with_method_reference() {
> JavaRDD appleRdd = toRdd(new Apple());
> JavaRDD bananaRdd = toRdd(new Banana());
> 
> long redAppleCount = appleRdd.filter(Fruit::isRed).count();
> long redBananaCount = bananaRdd.filter(Fruit::isRed).count();
> assertThat(redAppleCount, equalTo(1L));
> assertThat(redBananaCount, equalTo(0L));
> }
> }
> {code}
> The test above throws:
> {code}
> 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast 
> to com.doggybites.SparkTest$Apple
>   at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source)
>   at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>   at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 15/07/17 14:10:04 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, 
> localhost): java.lang.ClassCastException: com.doggybites.SparkTest$Banana 
> cannot be cast to com.doggybites.SparkTest$Apple
>   at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source)
>   at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scal

[jira] [Commented] (SPARK-25810) Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest

2019-02-20 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773044#comment-16773044
 ] 

Valeria Vasylieva commented on SPARK-25810:
---

I suppose the cause is here: 
[KafkaSourceProvider:521|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L517]
{code:java}
// Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the 
initial
// offsets by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
{code}
But I do not really think that it should be fixed, as Spark defines custom 
algorithm for offset checking and fetching in 
[KafkaSource|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala],
 thanks to it we can avoid Kafka errors on some offsets that do not exist etc.

[ConsumerConfig|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java]
 belongs to Kafka, so we cannot just change the logging here.

> Spark structured streaming logs auto.offset.reset=earliest even though 
> startingOffsets is set to latest
> ---
>
> Key: SPARK-25810
> URL: https://issues.apache.org/jira/browse/SPARK-25810
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: ANUJA BANTHIYA
>Priority: Trivial
>
> I have a  issue when i'm trying to read data from kafka using spark 
> structured streaming. 
> Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, 
> spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0
> The issue i am facing is that the spark job always logs auto.offset.reset = 
> earliest  even though latest option is specified in the code during startup 
> of application .
> Code to reproduce: 
> {code:java}
> package com.informatica.exec
> import org.apache.spark.sql.SparkSession
> object kafkaLatestOffset {
>  def main(s: Array[String]) {
>  val spark = SparkSession
>  .builder()
>  .appName("Spark Offset basic example")
>  .master("local[*]")
>  .getOrCreate()
>  val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "topic1")
>  .option("startingOffsets", "latest")
>  .load()
>  val query = df.writeStream
>  .outputMode("complete")
>  .format("console")
>  .start()
>  query.awaitTermination()
>  }
> }
> {code}
>  
> As mentioned in Structured streaming doc, {{startingOffsets}}  need to be set 
> for auto.offset.reset.
> [https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html]
>  * *auto.offset.reset*: Set the source option {{startingOffsets}} to specify 
> where to start instead. Structured Streaming manages which offsets are 
> consumed internally, rather than rely on the kafka Consumer to do it. This 
> will ensure that no data is missed when new topics/partitions are dynamically 
> subscribed. Note that {{startingOffsets}} only applies when a new streaming 
> query is started, and that resuming will always pick up from where the query 
> left off.
> During runtime , kafka messages are picked from the latest offset , so 
> functional wise it is working as expected. Only log is misleading as it logs  
> auto.offset.reset = *earliest* .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26869) UDF with struct requires to have _1 and _2 as struct field names

2019-02-20 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16772988#comment-16772988
 ] 

Valeria Vasylieva commented on SPARK-26869:
---

[~anddonram] you are trying to treat Struct as Tuple in udf, but even if you 
try to use Row/case class, it will also fail as it is not supported yet.

Try to look at [SPARK-12823|https://issues.apache.org/jira/browse/SPARK-12823], 
it seems to be related. Hope it helps.

 

> UDF with struct requires to have _1 and _2 as struct field names
> 
>
> Key: SPARK-26869
> URL: https://issues.apache.org/jira/browse/SPARK-26869
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
> Environment: Ubuntu 18.04.1 LTS
>Reporter: Andrés Doncel Ramírez
>Priority: Minor
>
> When using a UDF which has a Seq of tuples as input, the struct field names 
> need to match "_1" and "_2". The following code illustrates this:
>  
> {code:java}
> val df = sc.parallelize(Array(
>   ("1",3.0),
>   ("2",4.5),
>   ("5",2.0)
> )
> ).toDF("c1","c2")
> val df1=df.agg(collect_list(struct("c1","c2")).as("c3"))
> // Changing column names to _1 and _2 when creating the struct
> val 
> df2=df.agg(collect_list(struct(col("c1").as("_1"),col("c2").as("_2"))).as("c3"))
> def takeUDF = udf({ (xs: Seq[(String, Double)]) =>
>   xs.take(2)
> })
> df1.printSchema
> df2.printSchema
> df1.withColumn("c4",takeUDF(col("c3"))).show() // this fails
> df2.withColumn("c4",takeUDF(col("c3"))).show() // this works
> {code}
> The first one returns the following exception:
> org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(c3)' due to data 
> type mismatch: argument 1 requires array> type, 
> however, '`c3`' is of array> type.;;
> While the second works as expected and prints the result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic

2019-02-11 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva updated SPARK-20597:
--
Attachment: Jacek Laskowski.url

> KafkaSourceProvider falls back on path as synonym for topic
> ---
>
> Key: SPARK-20597
> URL: https://issues.apache.org/jira/browse/SPARK-20597
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: starter
>
> # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic 
> to save a DataFrame's rows to
> # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka 
> topics for writing
> What seems a quite interesting option is to support {{start(path: String)}} 
> as the least precedence option in which {{path}} would designate the default 
> topic when no other options are used.
> {code}
> df.writeStream.format("kafka").start("topic")
> {code}
> See 
> http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html
>  for discussion



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic

2019-02-11 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765060#comment-16765060
 ] 

Valeria Vasylieva commented on SPARK-20597:
---

Hi! 
[~Satyajit] are you still working on this task? If no, [~jlaskowski] can I give 
it a try?

> KafkaSourceProvider falls back on path as synonym for topic
> ---
>
> Key: SPARK-20597
> URL: https://issues.apache.org/jira/browse/SPARK-20597
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: starter
>
> # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic 
> to save a DataFrame's rows to
> # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka 
> topics for writing
> What seems a quite interesting option is to support {{start(path: String)}} 
> as the least precedence option in which {{path}} would designate the default 
> topic when no other options are used.
> {code}
> df.writeStream.format("kafka").start("topic")
> {code}
> See 
> http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html
>  for discussion



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20597) KafkaSourceProvider falls back on path as synonym for topic

2019-02-11 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva updated SPARK-20597:
--
Attachment: (was: Jacek Laskowski.url)

> KafkaSourceProvider falls back on path as synonym for topic
> ---
>
> Key: SPARK-20597
> URL: https://issues.apache.org/jira/browse/SPARK-20597
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>  Labels: starter
>
> # {{KafkaSourceProvider}} supports {{topic}} option that sets the Kafka topic 
> to save a DataFrame's rows to
> # {{KafkaSourceProvider}} can use {{topic}} column to assign rows to Kafka 
> topics for writing
> What seems a quite interesting option is to support {{start(path: String)}} 
> as the least precedence option in which {{path}} would designate the default 
> topic when no other options are used.
> {code}
> df.writeStream.format("kafka").start("topic")
> {code}
> See 
> http://apache-spark-developers-list.1001551.n3.nabble.com/KafkaSourceProvider-Why-topic-option-and-column-without-reverting-to-path-as-the-least-priority-td21458.html
>  for discussion



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org