Re: Is there a mutable dataframe spark structured streaming 2.3.0?

2018-03-23 Thread kant kodali
Quick Question regarding the same topic. If construct the data frame *df*
in the following way

key  | value   | topic| offset | partition | timestamp

foo1 | value1 | topic1 | ...
foo2 | value2 | topic2 | ...

what happens when I do df.writeStream().format("kafka").option("topic",
"hello_topic").start() ? Will it use the hello_topic specified in the
option or the topic from row ? Sure I don't need to specify topic as an
option if I have it in a row but I wonder what happens internally if this
is the case?

Thanks!



On Thu, Mar 22, 2018 at 7:48 AM, kant kodali  wrote:

> Thanks all!
>
> On Thu, Mar 22, 2018 at 2:08 AM, Jorge Machado  wrote:
>
>> DataFrames are not mutable.
>>
>> Jorge Machado
>>
>>
>> On 22 Mar 2018, at 10:07, Aakash Basu  wrote:
>>
>> Hey,
>>
>> I faced the same issue a couple of days back, kindly go through the mail
>> chain with "*Multiple Kafka Spark Streaming Dataframe Join query*" as
>> subject, TD and Chris has cleared my doubts, it would help you too.
>>
>> Thanks,
>> Aakash.
>>
>> On Thu, Mar 22, 2018 at 7:50 AM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Is there a mutable dataframe spark structured streaming 2.3.0? I am
>>> currently reading from Kafka and if I cannot parse the messages that I get
>>> from Kafka I want to write them to say some "dead_queue" topic.
>>>
>>> I wonder what is the best way to do this?
>>>
>>> Thanks!
>>>
>>
>>
>>
>


Re: strange behavior of joining dataframes

2018-03-23 Thread Shiyuan
Here is a simple example that reproduces the problem. This code has a
missing attribute('kk') error.  Is it  a bug? Note that if the `select`
in line B is removed, this code would run.

import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")#line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

On Tue, Mar 20, 2018 at 9:58 PM, Shiyuan  wrote:

> Hi Spark-users:
> I have a dataframe "df_t" which was generated from other dataframes by
> several transformations. And then I  did something very simple,  just
> counting the rows, that is the following code:
>
> (A)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t_3 = df_t_1.join(df_t_2, ["Id"])
> df_t.join(df_t_3, ["Id","key"])
>
> When I run this query, I got the error that  "key" is missing during
> joining. However, the column "key" is clearly in the dataframe dt.  What is
> strange is that: if I first do this:
>
>  data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)
>
> then (A) can run without error.  However,  the code (B) should not change
> the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
> failed without (B)?  Also, A different joining sequence can also complete
> without error:
>
> (C)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])
>
> But (A) and (C) are conceptually the same and  should produce the same
> result.  What could possibly go wrong here?  Any hints to track down
> the problem is appreciated.  I am using spark 2.1.
>
>
>
>
>


Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Takeshi Yamamuro
Can you file a jira if this is a bug?
Thanks!

On Sat, Mar 24, 2018 at 1:23 AM, Michael Shtelma  wrote:

> Hi Maropu,
>
> the problem seems to be in FilterEstimation.scala on lines 50 and 52:
> https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52
>
> val filterSelectivity =
> calculateFilterSelectivity(plan.condition).getOrElse(1.0)
> val filteredRowCount: BigInt =
> ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
>
> The problem is, that filterSelectivity gets NaN value in my case and
> NaN cannot be converted to BigDecimal.
> I can try adding simple if, checking the NaN value and test if this helps.
> I will also try to understand, why in my case, I am getting NaN.
>
> Best,
> Michael
>
>
> On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro 
> wrote:
> > hi,
> >
> > What's a query to reproduce this?
> > It seems when casting double to BigDecimal, it throws the exception.
> >
> > // maropu
> >
> > On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma 
> wrote:
> >>
> >> Hi all,
> >>
> >> I am using Spark 2.3 with activated cost-based optimizer and a couple
> >> of hive tables, that were analyzed previously.
> >>
> >> I am getting the following exception for different queries:
> >>
> >> java.lang.NumberFormatException
> >>
> >> at java.math.BigDecimal.(BigDecimal.java:494)
> >>
> >> at java.math.BigDecimal.(BigDecimal.java:824)
> >>
> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
> >>
> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
> >>
> >> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> FilterEstimation.estimate(FilterEstimation.scala:52)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.
> visit(LogicalPlanVisitor.scala:30)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
> >>
> >> at scala.Option.getOrElse(Option.scala:121)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.
> stats(LogicalPlan.scala:30)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
> >>
> >> at
> >> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(
> IndexedSeqOptimized.scala:38)
> >>
> >> at
> >> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.
> scala:43)
> >>
> >> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> ProjectEstimation$.estimate(ProjectEstimation.scala:27)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.
> visit(LogicalPlanVisitor.scala:37)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
> >>
> >> at scala.Option.getOrElse(Option.scala:121)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
> >>
> >> at
> >> 

Re: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-23 Thread Yong Zhang
I am still stuck with this. Anyone knows the correct way to use the custom 
Aggregator for the case class in agg way?


I like to use Dataset API, but it looks like in aggregation, Spark lost the 
Type, and back to GenericRowWithSchema, instead of my case class. Is that right?


Thanks



From: Yong Zhang 
Sent: Thursday, March 22, 2018 10:08 PM
To: user@spark.apache.org
Subject: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to Case class


I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
at 

Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-23 Thread Fawze Abujaber
Quick question:

how to add the  --jars /path/to/sparklens_2.11-0.1.0.jar to the
spark-default conf, should it be using:

spark.driver.extraClassPath /path/to/sparklens_2.11-0.1.0.jar or i should
use spark.jars option? anyone who could give an example how it should be,
and if i the path for the jar should be an hdfs path as i'm using it in
cluster mode.




On Fri, Mar 23, 2018 at 6:33 AM, Fawze Abujaber  wrote:

> Hi Shmuel,
>
> Did you compile the code against the right branch for Spark 1.6.
>
> I tested it and it looks working and now i'm testing the branch for a wide
> tests, Please use the branch for Spark 1.6
>
> On Fri, Mar 23, 2018 at 12:43 AM, Shmuel Blitz <
> shmuel.bl...@similarweb.com> wrote:
>
>> Hi Rohit,
>>
>> Thanks for sharing this great tool.
>> I tried running a spark job with the tool, but it failed with an 
>> *IncompatibleClassChangeError
>> *Exception.
>>
>> I have opened an issue on Github.(https://github.com/qub
>> ole/sparklens/issues/1)
>>
>> Shmuel
>>
>> On Thu, Mar 22, 2018 at 5:05 PM, Shmuel Blitz <
>> shmuel.bl...@similarweb.com> wrote:
>>
>>> Thanks.
>>>
>>> We will give this a try and report back.
>>>
>>> Shmuel
>>>
>>> On Thu, Mar 22, 2018 at 4:22 PM, Rohit Karlupia 
>>> wrote:
>>>
 Thanks everyone!
 Please share how it works and how it doesn't. Both help.

 Fawaze, just made few changes to make this work with spark 1.6. Can you
 please try building from branch *spark_1.6*

 thanks,
 rohitk



 On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber 
 wrote:

> It's super amazing  i see it was tested on spark 2.0.0 and above,
> what about Spark 1.6 which is still part of Cloudera's main versions?
>
> We have a vast Spark applications with version 1.6.0
>
> On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau 
> wrote:
>
>> Super exciting! I look forward to digging through it this weekend.
>>
>> On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
>> ravishankar.n...@gmail.com> wrote:
>>
>>> Excellent. You filled a missing link.
>>>
>>> Best,
>>> Passion
>>>
>>> On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia 
>>> wrote:
>>>
 Hi,

 Happy to announce the availability of Sparklens as open source
 project. It helps in understanding the  scalability limits of spark
 applications and can be a useful guide on the path towards tuning
 applications for lower runtime or cost.

 Please clone from here: https://github.com/qubole/sparklens
 Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
 ark-tuning-tool/

 thanks,
 rohitk

 PS: Thanks for the patience. It took couple of months to get back
 on this.





>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>

>>>
>>>
>>> --
>>> Shmuel Blitz
>>> Big Data Developer
>>> Email: shmuel.bl...@similarweb.com
>>> www.similarweb.com
>>> 
>>> 
>>> 
>>>
>>
>>
>>
>> --
>> Shmuel Blitz
>> Big Data Developer
>> Email: shmuel.bl...@similarweb.com
>> www.similarweb.com
>> 
>> 
>> 
>>
>
>


Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Michael Shtelma
Hi Maropu,

the problem seems to be in FilterEstimation.scala on lines 50 and 52:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52

val filterSelectivity =
calculateFilterSelectivity(plan.condition).getOrElse(1.0)
val filteredRowCount: BigInt =
ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)

The problem is, that filterSelectivity gets NaN value in my case and
NaN cannot be converted to BigDecimal.
I can try adding simple if, checking the NaN value and test if this helps.
I will also try to understand, why in my case, I am getting NaN.

Best,
Michael


On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro  wrote:
> hi,
>
> What's a query to reproduce this?
> It seems when casting double to BigDecimal, it throws the exception.
>
> // maropu
>
> On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma  wrote:
>>
>> Hi all,
>>
>> I am using Spark 2.3 with activated cost-based optimizer and a couple
>> of hive tables, that were analyzed previously.
>>
>> I am getting the following exception for different queries:
>>
>> java.lang.NumberFormatException
>>
>> at java.math.BigDecimal.(BigDecimal.java:494)
>>
>> at java.math.BigDecimal.(BigDecimal.java:824)
>>
>> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>>
>> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>>
>> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>>
>> at scala.Option.getOrElse(Option.scala:121)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)
>>
>> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>>
>> at scala.Option.getOrElse(Option.scala:121)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>>
>> at
>> org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)
>>
>> at
>> org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)
>>
>> at
>> scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83)
>>
>> at 

Apache Spark Structured Streaming - How to keep executor alive.

2018-03-23 Thread M Singh
Hi:
I am working on spark structured streaming (2.2.1) with kafka and want 100 
executors to be alive. I set spark.executor.instances to be 100.  The process 
starts running with 100 executors but after some time only a few remain which 
causes backlog of events from kafka.  
I thought I saw a setting to keep the executors from being killed.  However, I 
am not able to find that configuration in spark docs.  If anyone knows that 
setting, please let me know.
Thanks


[Spark Core] details of persisting RDDs

2018-03-23 Thread Stefano Pettini
Hi,

couple of questions about the internals of the persist mechanism (RDD, but
maybe applicable also to DS/DF).

Data is processed stage by stage. So what actually runs in worker nodes is
the calculation of the partitions of the result of a stage, not the single
RDDs. Operation of all the RDDs that form a stage are run together. That at
least how I interpret the UI and the logs.

Then, what does "persisting an RDD" that is in the middle of a stage
actually mean? Let's say the result of a map, that is located before
another map, located before a reduce. Persisting A that is inside the stage
A -> B -> C.

Also the hint "persist an RDD if it's used more than once and you don't
want it to be calculated twice" is not precise. For example, if inside a
stage we have:

A -> B -> C -> E -> F
 | |
  --> D -->

So basically a diamond, where B is used twice, as input of C and D, but
then the workflow re-joins in E, all inside the same stage, no shuffling. I
tested and B is not calculated twice. And again the original question: what
does actually happen when B is marked to be persisted?

Regards,
Stefano


ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-23 Thread Eirik Thorsnes
Hi all,

I'm trying the new ORC native in Spark 2.3
(org.apache.spark.sql.execution.datasources.orc).

I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.

*NOTE*: the error only occurs with zlib compression, and I see that with
Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?

I can write using the new native codepath without errors, but *reading*
zlib-compressed ORC, either the newly written ORC-files *or* older
ORC-files written with Spark 2.2/1.6 I get the following exception.

=== cut =
2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc,
range: 0-134217728, partition values: [1999]
2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
with {include: [true, true, true, true, true, true, true, true, true],
offset: 0, length: 134217728}
2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
provided -- using file schema
struct

2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
1.0 (TID 1)
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:500)
at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
at
org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
at
org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:58)
at
org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
at
org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:976)
at
org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:1815)
at
org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
at
org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextBatch(OrcColumnarBatchReader.scala:186)
at
org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextKeyValue(OrcColumnarBatchReader.scala:114)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
=== cut =

I have the following set in spark-defaults.conf:

spark.sql.hive.convertMetastoreOrc true
spark.sql.orc.char.enabled true
spark.sql.orc.enabled true
spark.sql.orc.filterPushdown true
spark.sql.orc.impl native
spark.sql.orc.enableVectorizedReader true


If I set these to false and use the old hive reader (or specify the full
classname for the old hive reader in the spark-shell) I get results OK
with both new and old orc-files.

If I use Snappy compression it works with the new reader without error.

NOTE: I'm running on Hortonworks HDP 2.6.4 (Hadoop 2.7.3) and I also get
the same error for the Spark 2.2 there which I 

Re: how to use lit() in spark-java

2018-03-23 Thread Anthony, Olufemi
You can us import static to import it directly:

import static org.apache.spark.sql.functions.lit;

Femi
From: 崔苗 
Date: Friday, March 23, 2018 at 8:34 AM
To: "user@spark.apache.org" 
Subject: how to use lit() in spark-java

Hi Guys,

I want to add a constant column to dataset by lit function in java, like that:
 dataset.withColumn(columnName,lit("constant"))
but it's seems that idea coundn't found the lit() function,so how to use lit() 
function in java?

thanks for any reply






The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: how to use lit() in spark-java

2018-03-23 Thread Anil Langote
You have import functions

dataset.withColumn(columnName,functions.lit("constant"))

Thank you
Anil Langote

Sent from my iPhone
_
From: 崔苗 
Sent: Friday, March 23, 2018 8:33 AM
Subject: how to use lit() in spark-java
To: 


Hi Guys,

I want to add a constant column to dataset by lit function in java, like that:
 dataset.withColumn(columnName,lit("constant"))
but it's seems that idea coundn't found the lit() function,so how to use lit() 
function in java?

thanks for any reply






Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Takeshi Yamamuro
hi,

What's a query to reproduce this?
It seems when casting double to BigDecimal, it throws the exception.

// maropu

On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma  wrote:

> Hi all,
>
> I am using Spark 2.3 with activated cost-based optimizer and a couple
> of hive tables, that were analyzed previously.
>
> I am getting the following exception for different queries:
>
> java.lang.NumberFormatException
>
> at java.math.BigDecimal.(BigDecimal.java:494)
>
> at java.math.BigDecimal.(BigDecimal.java:824)
>
> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>
> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>
> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> FilterEstimation.estimate(FilterEstimation.scala:52)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.
> visit(LogicalPlanVisitor.scala:30)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.
> stats(LogicalPlan.scala:30)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>
> at scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(
> IndexedSeqOptimized.scala:38)
>
> at scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.
> scala:43)
>
> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> ProjectEstimation$.estimate(ProjectEstimation.scala:27)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
>
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.
> visit(LogicalPlanVisitor.scala:37)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.
> stats(LogicalPlan.scala:30)
>
> at org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$
> 2.apply(CostBasedJoinReorder.scala:64)
>
> at org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$
> 2.apply(CostBasedJoinReorder.scala:64)
>
> at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.
> scala:83)
>
> at scala.collection.immutable.List.forall(List.scala:84)
>
> at org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$.org$
> apache$spark$sql$catalyst$optimizer$CostBasedJoinReorder$$reorder(
> CostBasedJoinReorder.scala:64)
>
> at org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$
> 1.applyOrElse(CostBasedJoinReorder.scala:46)
>
> at org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$
> 1.applyOrElse(CostBasedJoinReorder.scala:43)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.
> apply(TreeNode.scala:267)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.
> apply(TreeNode.scala:267)
>
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
> withOrigin(TreeNode.scala:70)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:266)
>
> at 

how to use lit() in spark-java

2018-03-23 Thread 崔苗
Hi Guys, 

I want to add a constant column to dataset by lit function in java, like that:
 dataset.withColumn(columnName,lit("constant"))
but it's seems that idea coundn't found the lit() function,so how to use lit() 
function in java?

thanks for any reply









Re: Spark and Accumulo Delegation tokens

2018-03-23 Thread Saisai Shao
It is in yarn module.
"org.apache.spark.deploy.yarn.security.ServiceCredentialProvider".

2018-03-23 15:10 GMT+08:00 Jorge Machado :

> Hi Jerry,
>
> where do you see that Class on Spark ? I only found 
> HadoopDelegationTokenManager
> and I don’t see any way to add my Provider into it.
>
> private def getDelegationTokenProviders: Map[String, 
> HadoopDelegationTokenProvider] = {
>   val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
> new HiveDelegationTokenProvider,
> new HBaseDelegationTokenProvider)
>
>   // Filter out providers for which 
> spark.security.credentials.{service}.enabled is false.
>   providers
> .filter { p => isServiceEnabled(p.serviceName) }
> .map { p => (p.serviceName, p) }
> .toMap
> }
>
>
> If you could give me a tipp there would be great.
> Thanks
>
> Jorge Machado
>
>
>
>
>
> On 23 Mar 2018, at 07:38, Saisai Shao  wrote:
>
> I think you can build your own Accumulo credential provider as similar to
> HadoopDelegationTokenProvider out of Spark, Spark already provided an
> interface "ServiceCredentialProvider" for user to plug-in customized
> credential provider.
>
> Thanks
> Jerry
>
> 2018-03-23 14:29 GMT+08:00 Jorge Machado :
>
> Hi Guys,
>
> I’m on the middle of writing a spark Datasource connector for Apache Spark
> to connect to Accumulo Tablets, because we have Kerberos it get’s a little
> trick because Spark only handles the Delegation Tokens from Hbase, hive and
> hdfs.
>
> Would be a PR for a implementation of HadoopDelegationTokenProvider for
> Accumulo be accepted ?
>
>
> Jorge Machado
>
>
>
>
>
>
>
>


[Spark Core] Getting the number of stages a job is made of

2018-03-23 Thread Stefano Pettini
 Hi everybody,

this is my first message to the mailing list. In a world of DataFrames and
Structured Streaming my use cases may be considered kind of corner cases,
but still I think it's important to address such problems and go deep in
understanding how Spark RDDs work.

We have an application that synthesizes complex spark jobs, using the RDD
interface, easily reaching hundreds of RDDs and stages for each single job.
Due to the nature of the functions passed to mapPartitions & co, we need
precise control on the characteristics of the job Spark constructs when an
action is invoked on the last RDD, like what is shuffled and what is not,
the number of stages, fine tuning of RDD persistence, the partitioning and
so on.

For example we have unit tests to make sure the number of stages that
composes the job synthesized for a given use case doesn't increase to
detect unforeseen shuffles that we would consider a regression.

Problem is that it's difficult to "introspect" the job produced by Spark.
Even counting the number of stages is complex, there isn't any simple
stageId associated to each RDD. Today we're doing it indirectly by
registering a listener and monitoring the events when the action is
invoked, but I would prefer doing it in a more direct way, like checking
the properties of an RDD.

So another approach could be analyzing, recursively, the dependencies of
the RDD where the action is invoked and counting the number of dependencies
that are a ShuffleDependency, making sure each parent RDD is considered
only once.

Does it make sense? It is a reliable method?

Regards,
Stefano


Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Michael Shtelma
Hi all,

I am using Spark 2.3 with activated cost-based optimizer and a couple
of hive tables, that were analyzed previously.

I am getting the following exception for different queries:

java.lang.NumberFormatException

at java.math.BigDecimal.(BigDecimal.java:494)

at java.math.BigDecimal.(BigDecimal.java:824)

at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)

at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)

at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)

at 
scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)

at 
scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)

at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)

at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83)

at scala.collection.immutable.List.forall(List.scala:84)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$.org$apache$spark$sql$catalyst$optimizer$CostBasedJoinReorder$$reorder(CostBasedJoinReorder.scala:64)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$1.applyOrElse(CostBasedJoinReorder.scala:46)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$1.applyOrElse(CostBasedJoinReorder.scala:43)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

at 

Calculate co-occurring terms

2018-03-23 Thread Donni Khan
Hi,

I have a collection of text documents, I extracted the list of significat
terms from that collection.
I want to calculate co-occurance matrix for the extracted terms by using
spark.

I actually stored the the collection of text document in a DataFrame,

StructType schema = *new* StructType(*new* StructField[] {

*new* StructField("ID", DataTypes.*StringType*, *false*,

Metadata.*empty*()),

*new* StructField("text", DataTypes.*StringType*, *false*,

Metadata.*empty*()) });

// Create a DataFrame *wrt* a new schema

DataFrame preProcessedDF = sqlContext.createDataFrame(jrdd, schema);

I can extract the list of terms from "preProcessedDF " into a List or RDD
or DataFrame.
for each (term_i,term_j) I want to calculate the realted frequency from the
original dataset "preProcessedDF "

anyone has scalbale soloution?

thank you,
Donni


Re: Spark and Accumulo Delegation tokens

2018-03-23 Thread Jorge Machado
Hi Jerry, 

where do you see that Class on Spark ? I only found 
HadoopDelegationTokenManager and I don’t see any way to add my Provider into 
it. 

private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
  val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
new HBaseDelegationTokenProvider)

  // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
  providers
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}

If you could give me a tipp there would be great. 
Thanks 

Jorge Machado





> On 23 Mar 2018, at 07:38, Saisai Shao  wrote:
> 
> I think you can build your own Accumulo credential provider as similar to
> HadoopDelegationTokenProvider out of Spark, Spark already provided an
> interface "ServiceCredentialProvider" for user to plug-in customized
> credential provider.
> 
> Thanks
> Jerry
> 
> 2018-03-23 14:29 GMT+08:00 Jorge Machado :
> 
>> Hi Guys,
>> 
>> I’m on the middle of writing a spark Datasource connector for Apache Spark
>> to connect to Accumulo Tablets, because we have Kerberos it get’s a little
>> trick because Spark only handles the Delegation Tokens from Hbase, hive and
>> hdfs.
>> 
>> Would be a PR for a implementation of HadoopDelegationTokenProvider for
>> Accumulo be accepted ?
>> 
>> 
>> Jorge Machado
>> 
>> 
>> 
>> 
>> 
>> 



Re: Spark and Accumulo Delegation tokens

2018-03-23 Thread Saisai Shao
I think you can build your own Accumulo credential provider as similar to
HadoopDelegationTokenProvider out of Spark, Spark already provided an
interface "ServiceCredentialProvider" for user to plug-in customized
credential provider.

Thanks
Jerry

2018-03-23 14:29 GMT+08:00 Jorge Machado :

> Hi Guys,
>
> I’m on the middle of writing a spark Datasource connector for Apache Spark
> to connect to Accumulo Tablets, because we have Kerberos it get’s a little
> trick because Spark only handles the Delegation Tokens from Hbase, hive and
> hdfs.
>
> Would be a PR for a implementation of HadoopDelegationTokenProvider for
> Accumulo be accepted ?
>
>
> Jorge Machado
>
>
>
>
>
>


Spark and Accumulo Delegation tokens

2018-03-23 Thread Jorge Machado
Hi Guys, 

I’m on the middle of writing a spark Datasource connector for Apache Spark to 
connect to Accumulo Tablets, because we have Kerberos it get’s a little trick 
because Spark only handles the Delegation Tokens from Hbase, hive and hdfs. 

Would be a PR for a implementation of HadoopDelegationTokenProvider for 
Accumulo be accepted ? 


Jorge Machado







Re: Structured Streaming Spark 2.3 Query

2018-03-23 Thread Bowden, Chris
Use a streaming query listener that tracks repetitive progress events for the 
same batch id. If x amount of time has elapsed given repetitive progress events 
for the same batch id, the source is not providing new offsets and stream 
execution is not scheduling new micro batches. See also: 
spark.sql.streaming.pollingDelay. Alternative methods may produce less than 
desirable results due to specific characteristics of a source / sink / 
workflow. It may be more desirable to represent the amount of time as the 
number of repetitive progress events to be more forgiving of implementation 
details (e.g., kafka source has internal retry attempts to determine latest 
offsets and sleeps in between attempts if there is a miss when asked for new 
data, etc.).


-Chris


From: Aakash Basu 
Sent: Thursday, March 22, 2018 10:45:38 PM
To: user
Subject: Structured Streaming Spark 2.3 Query

Hi,

What is the way to stop a Spark Streaming job if there is no data inflow for an 
arbitrary amount of time (eg: 2 mins)?

Thanks,
Aakash.