Structured Streaming Spark 2.3 Query

2018-03-22 Thread Aakash Basu
Hi,

What is the way to stop a Spark Streaming job if there is no data inflow
for an arbitrary amount of time (eg: 2 mins)?

Thanks,
Aakash.


Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-22 Thread Fawze Abujaber
Hi Shmuel,

Did you compile the code against the right branch for Spark 1.6.

I tested it and it looks working and now i'm testing the branch for a wide
tests, Please use the branch for Spark 1.6

On Fri, Mar 23, 2018 at 12:43 AM, Shmuel Blitz 
wrote:

> Hi Rohit,
>
> Thanks for sharing this great tool.
> I tried running a spark job with the tool, but it failed with an 
> *IncompatibleClassChangeError
> *Exception.
>
> I have opened an issue on Github.(https://github.com/
> qubole/sparklens/issues/1)
>
> Shmuel
>
> On Thu, Mar 22, 2018 at 5:05 PM, Shmuel Blitz  > wrote:
>
>> Thanks.
>>
>> We will give this a try and report back.
>>
>> Shmuel
>>
>> On Thu, Mar 22, 2018 at 4:22 PM, Rohit Karlupia 
>> wrote:
>>
>>> Thanks everyone!
>>> Please share how it works and how it doesn't. Both help.
>>>
>>> Fawaze, just made few changes to make this work with spark 1.6. Can you
>>> please try building from branch *spark_1.6*
>>>
>>> thanks,
>>> rohitk
>>>
>>>
>>>
>>> On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber 
>>> wrote:
>>>
 It's super amazing  i see it was tested on spark 2.0.0 and above,
 what about Spark 1.6 which is still part of Cloudera's main versions?

 We have a vast Spark applications with version 1.6.0

 On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau 
 wrote:

> Super exciting! I look forward to digging through it this weekend.
>
> On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
> ravishankar.n...@gmail.com> wrote:
>
>> Excellent. You filled a missing link.
>>
>> Best,
>> Passion
>>
>> On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia 
>> wrote:
>>
>>> Hi,
>>>
>>> Happy to announce the availability of Sparklens as open source
>>> project. It helps in understanding the  scalability limits of spark
>>> applications and can be a useful guide on the path towards tuning
>>> applications for lower runtime or cost.
>>>
>>> Please clone from here: https://github.com/qubole/sparklens
>>> Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
>>> ark-tuning-tool/
>>>
>>> thanks,
>>> rohitk
>>>
>>> PS: Thanks for the patience. It took couple of months to get back on
>>> this.
>>>
>>>
>>>
>>>
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>


>>>
>>
>>
>> --
>> Shmuel Blitz
>> Big Data Developer
>> Email: shmuel.bl...@similarweb.com
>> www.similarweb.com
>> 
>> 
>> 
>>
>
>
>
> --
> Shmuel Blitz
> Big Data Developer
> Email: shmuel.bl...@similarweb.com
> www.similarweb.com
> 
> 
> 
>


Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-03-22 Thread M Singh
Hi:
I am working on a realtime application using spark structured streaming (v 
2.2.1). The application reads data from kafka and if there is a failure, I 
would like to ignore the checkpoint.  Is there any configuration to just read 
from last kafka offset after a failure and ignore any offset checkpoints ? 
Also, I believe that the checkpoint also saves state and will continue to 
aggregations after recovery.  Is there any way to ignore checkpointed state ?
Also, is there a way to selectively save state or offset checkpoint only ?

Thanks


Re: Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread Tathagata Das
Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint
directory that you provide. And when you start the query again with the
same directory it will just pick up where it left off.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

On Thu, Mar 22, 2018 at 8:06 PM, M Singh 
wrote:

> Hi:
>
> I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the
> last few days, after running the application for 30-60 minutes get
> exception from Kafka Consumer included below.
>
> The structured streaming application is processing 1 minute worth of data
> from kafka topic. So I've tried increasing request.timeout.ms from 4
> seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still
> get the same exception.
>
> Is there any spark/kafka configuration that can save the offset and retry
> it next time rather than throwing an exception and killing the application.
>
> I've tried googling but have not found substantial
> solution/recommendation.  If anyone has any suggestions or a different
> version etc, please let me know.
>
> Thanks
>
> Here is the exception stack trace.
>
> java.util.concurrent.TimeoutException: Cannot fetch record for offset
>  in 12 milliseconds
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$
> apache$spark$sql$kafka010$CachedKafkaConsumer$$
> fetchData(CachedKafkaConsumer.scala:219)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:117)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(
> UninterruptibleThread.scala:85)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.
> runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:157)
> at
>


Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread M Singh
Hi:
I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last 
few days, after running the application for 30-60 minutes get exception from 
Kafka Consumer included below.

The structured streaming application is processing 1 minute worth of data from 
kafka topic. So I've tried increasing request.timeout.ms from 4 seconds 
default to 45000 seconds and receive.buffer.bytes to 1mb but still get the same 
exception.
Is there any spark/kafka configuration that can save the offset and retry it 
next time rather than throwing an exception and killing the application.
I've tried googling but have not found substantial solution/recommendation.  If 
anyone has any suggestions or a different version etc, please let me know.
Thanks
Here is the exception stack trace.

java.util.concurrent.TimeoutException: Cannot fetch record for offset  
in 12 millisecondsat 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
 at 


Re: [Structured Streaming] Application Updates in Production

2018-03-22 Thread Tathagata Das
Yes indeed, we dont directly support schema migration of state as of now.
However, depending on what stateful operator you are using, you can work
around it. For example, if you are using mapGroupsWithState /
flatMapGroupsWithState, you can save explicitly convert your state to
avro-encoded bytes and save bytes as state. You will be responsible for
encoding the state in avro such that when you can migrate schema yourself
(much like kafka + avro + schema registry).

On Wed, Mar 21, 2018 at 5:45 PM, Priyank Shrivastava 
wrote:

> TD,
>
> But what if the state schema does change?  My understanding is that if in
> the new code I change the state schema the application will not be able to
> use the old checkpoints.  Is that not correct?
>
> Applications running is parallel is to ensure there is no downtime in
> production i.e because the new app will not pick up from the old
> checkpoints, one would need to keep the old app and the new app running
> until new app catches up on data processing with the old app.
>
>
> - Original message -
> From: Tathagata Das 
> To: Priyank Shrivastava 
> Cc: user 
> Subject: Re: [Structured Streaming] Application Updates in Production
> Date: Wed, Mar 21, 2018 5:28 PM
>
> Why do you want to start the new code in parallel to the old one? Why not
> stop the old one, and then start the new one? Structured Streaming ensures
> that all checkpoint information (offsets and state) are future-compatible
> (as long as state schema is unchanged), hence new code should be able to
> pick exactly where the old code left off.
>
> TD
>
> On Wed, Mar 21, 2018 at 11:56 AM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
> I am using Structured Streaming with Spark 2.2.  We are using Kafka as our
> source and are using checkpoints for failure recovery and e2e exactly once
> guarantees.  I would like to get some more information on how to handle
> updates to the application when there is a change in stateful operations
> and/or output schema.
>
> As some of the sources suggest I can start the updated application
> parallelly with the old application until it catches up with the old
> application in terms of data, and then kill the old one.  But then the new
> application will have to re-read/re-process all the data in kafka which
> could take a long time.
>
> I want to AVOID this re-processing of the data in the newly deployed
> updated application.
>
> One way I can think of is for the application to keep writing the offsets
> into something in addition to the checkpoint directory, for example in
> zookeeper/hdfs.  And then, on an update of the application, I command Kafka
> readstream() to start reading from the offsets stored in this new location
> (zookeeper/hdfs) - since the updated application can't read from the
> checkpoint directory which is now deemed incompatible.
>
> So a couple of questions:
> 1.  Is the above-stated solution a valid solution?
> 2.  If yes, How can I automate the detection of whether the application is
> being restarted because of a failure/maintenance or because of code changes
> to stateful operations and/or output schema?
>
> Any guidance, example or information source is appreciated.
>
> Thanks,
> Priyank
>
>
>
>


java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Case class

2018-03-22 Thread Yong Zhang
I am trying to research a custom Aggregator implementation, and following the 
example in the Spark sample code here:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala


But I cannot use it in the agg function, and got the error like 
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to my case class. If I don't use the group by, then it works as in the same way 
in the sample code. To make it with group by, what I need to change?


This is on Spark 2.2, as shown below. Following the spark example, I can do

rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
without any issue, but if
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)

I will get the cast exception. But I want to apply my custom Aggregator 
implementation per group. How do I fix this?

Thanks


scala> spark.version
res31: String = 2.2.1

case class FlagChangeLog(date: String, old_flag: Boolean, new_flag: Boolean)
case class DeriveRecord (domain: String, date: String, flag: Boolean, isDelta: 
Boolean, flag_changelog: scala.collection.mutable.ListBuffer[FlagChangeLog])

val rawDS = Seq(
  DeriveRecord("abc.com", "2017-01-09", true, false, ListBuffer.empty),
  DeriveRecord("123.com", "2015-01-01", false, false, ListBuffer.empty),
  DeriveRecord("abc.com", "2018-01-09", false, true, ListBuffer.empty),
  DeriveRecord("123.com", "2017-01-09", true, true, ListBuffer.empty),
  DeriveRecord("xyz.com", "2018-03-09", false, true, ListBuffer.empty)
).toDS

scala> rawDS.show(false)
+---+--+-+---+--+
|domain |date  |flag |isDelta|flag_changelog|
+---+--+-+---+--+
|abc.com|2017-01-09|true |false  |[]|
|123.com|2015-01-01|false|false  |[]|
|abc.com|2018-01-09|false|true   |[]|
|123.com|2017-01-09|true |true   |[]|
|xyz.com|2018-03-09|false|true   |[]|
+---+--+-+---+--+

object ChangeLogAggregator extends Aggregator[DeriveRecord, DeriveRecord, 
DeriveRecord] {
  def zero: DeriveRecord = ///

  def reduce(buffer: DeriveRecord, curr: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def merge(b1: DeriveRecord, b2: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def finish(output: DeriveRecord): DeriveRecord = {
/// ommit
  }

  def bufferEncoder: Encoder[DeriveRecord] = Encoders.product
  def outputEncoder: Encoder[DeriveRecord] = Encoders.product
}

scala> rawDS.select(ChangeLogAggregator.toColumn.name("change_log")).show(false)
+---+--+-+---+---+
|domain |date  |flag |isDelta|flag_changelog
 |
+---+--+-+---+---+
|abc.com|2018-01-09|false|true   |[[2015-01-01,true,false], 
[2018-01-09,false,false]]|
+---+--+-+---+---+

scala> 
rawDS.groupBy($"domain").agg(ChangeLogAggregator.toColumn.name("change_log")).show(false)
18/03/22 22:04:44 ERROR Executor: Exception in task 1.0 in stage 36.0 (TID 48)
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to $line15.$read$$iw$$iw$DeriveRecord
at 
$line110.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ChangeLogAggregator$.reduce(:31)
at 
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.update(TypedAggregateExpression.scala:239)
at 
org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:524)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:162)
at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
at 

Transaction Examplefor spark streaming in Spark2.2

2018-03-22 Thread KhajaAsmath Mohammed
Hi Cody,

I am following to implement the exactly once semantics and also utilize
storing the offsets in database. Question I have is how to use hive instead
of traditional datastores. write to hive will be successful even though
there is any issue with saving offsets into DB. Could you please correct me
if I am wrong or let me know if you have any other suggestions.

stream.foreachRDD { rdd =>
   if (!rdd.isEmpty()) {
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  DB.localTx { implicit session =>
   *// Write data to Hive after creating dataframes from Dtream RDD*

// Store Offsets to DB
offsetRanges.foreach { osr =>
  val offsetRows = sql"""
  update txn_offsets set offset =
${osr.untilOffset}
where topic = ${osr.topic} and part =
${osr.partition} and offset = ${osr.fromOffset}
  """.update.apply()
  if (offsetRows != 1) {
throw new Exception(s"""
  Got $offsetRows rows affected instead of 1
when attempting to update offsets for
   ${osr.topic} ${osr.partition}
${osr.fromOffset} -> ${osr.untilOffset}
  Was a partition repeated after a worker
failure?
  """)
  }
}

  }
   }
}

Thanks,
Asmath


Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-22 Thread Shmuel Blitz
Hi Rohit,

Thanks for sharing this great tool.
I tried running a spark job with the tool, but it failed with an
*IncompatibleClassChangeError
*Exception.

I have opened an issue on Github.(
https://github.com/qubole/sparklens/issues/1)

Shmuel

On Thu, Mar 22, 2018 at 5:05 PM, Shmuel Blitz 
wrote:

> Thanks.
>
> We will give this a try and report back.
>
> Shmuel
>
> On Thu, Mar 22, 2018 at 4:22 PM, Rohit Karlupia  wrote:
>
>> Thanks everyone!
>> Please share how it works and how it doesn't. Both help.
>>
>> Fawaze, just made few changes to make this work with spark 1.6. Can you
>> please try building from branch *spark_1.6*
>>
>> thanks,
>> rohitk
>>
>>
>>
>> On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber 
>> wrote:
>>
>>> It's super amazing  i see it was tested on spark 2.0.0 and above,
>>> what about Spark 1.6 which is still part of Cloudera's main versions?
>>>
>>> We have a vast Spark applications with version 1.6.0
>>>
>>> On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau 
>>> wrote:
>>>
 Super exciting! I look forward to digging through it this weekend.

 On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
 ravishankar.n...@gmail.com> wrote:

> Excellent. You filled a missing link.
>
> Best,
> Passion
>
> On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia 
> wrote:
>
>> Hi,
>>
>> Happy to announce the availability of Sparklens as open source
>> project. It helps in understanding the  scalability limits of spark
>> applications and can be a useful guide on the path towards tuning
>> applications for lower runtime or cost.
>>
>> Please clone from here: https://github.com/qubole/sparklens
>> Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
>> ark-tuning-tool/
>>
>> thanks,
>> rohitk
>>
>> PS: Thanks for the patience. It took couple of months to get back on
>> this.
>>
>>
>>
>>
>>
> --
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>
>
>
> --
> Shmuel Blitz
> Big Data Developer
> Email: shmuel.bl...@similarweb.com
> www.similarweb.com
> 
> 
> 
>



-- 
Shmuel Blitz
Big Data Developer
Email: shmuel.bl...@similarweb.com
www.similarweb.com

 


Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-22 Thread Shmuel Blitz
Thanks.

We will give this a try and report back.

Shmuel

On Thu, Mar 22, 2018 at 4:22 PM, Rohit Karlupia  wrote:

> Thanks everyone!
> Please share how it works and how it doesn't. Both help.
>
> Fawaze, just made few changes to make this work with spark 1.6. Can you
> please try building from branch *spark_1.6*
>
> thanks,
> rohitk
>
>
>
> On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber 
> wrote:
>
>> It's super amazing  i see it was tested on spark 2.0.0 and above,
>> what about Spark 1.6 which is still part of Cloudera's main versions?
>>
>> We have a vast Spark applications with version 1.6.0
>>
>> On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau 
>> wrote:
>>
>>> Super exciting! I look forward to digging through it this weekend.
>>>
>>> On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
>>> ravishankar.n...@gmail.com> wrote:
>>>
 Excellent. You filled a missing link.

 Best,
 Passion

 On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia 
 wrote:

> Hi,
>
> Happy to announce the availability of Sparklens as open source
> project. It helps in understanding the  scalability limits of spark
> applications and can be a useful guide on the path towards tuning
> applications for lower runtime or cost.
>
> Please clone from here: https://github.com/qubole/sparklens
> Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
> ark-tuning-tool/
>
> thanks,
> rohitk
>
> PS: Thanks for the patience. It took couple of months to get back on
> this.
>
>
>
>
>
 --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>


-- 
Shmuel Blitz
Big Data Developer
Email: shmuel.bl...@similarweb.com
www.similarweb.com

 


Re: Is there a mutable dataframe spark structured streaming 2.3.0?

2018-03-22 Thread kant kodali
Thanks all!

On Thu, Mar 22, 2018 at 2:08 AM, Jorge Machado  wrote:

> DataFrames are not mutable.
>
> Jorge Machado
>
>
> On 22 Mar 2018, at 10:07, Aakash Basu  wrote:
>
> Hey,
>
> I faced the same issue a couple of days back, kindly go through the mail
> chain with "*Multiple Kafka Spark Streaming Dataframe Join query*" as
> subject, TD and Chris has cleared my doubts, it would help you too.
>
> Thanks,
> Aakash.
>
> On Thu, Mar 22, 2018 at 7:50 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> Is there a mutable dataframe spark structured streaming 2.3.0? I am
>> currently reading from Kafka and if I cannot parse the messages that I get
>> from Kafka I want to write them to say some "dead_queue" topic.
>>
>> I wonder what is the best way to do this?
>>
>> Thanks!
>>
>
>
>


Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-22 Thread Rohit Karlupia
Thanks everyone!
Please share how it works and how it doesn't. Both help.

Fawaze, just made few changes to make this work with spark 1.6. Can you
please try building from branch *spark_1.6*

thanks,
rohitk



On Thu, Mar 22, 2018 at 10:18 AM, Fawze Abujaber  wrote:

> It's super amazing  i see it was tested on spark 2.0.0 and above, what
> about Spark 1.6 which is still part of Cloudera's main versions?
>
> We have a vast Spark applications with version 1.6.0
>
> On Thu, Mar 22, 2018 at 6:38 AM, Holden Karau 
> wrote:
>
>> Super exciting! I look forward to digging through it this weekend.
>>
>> On Wed, Mar 21, 2018 at 9:33 PM ☼ R Nair (रविशंकर नायर) <
>> ravishankar.n...@gmail.com> wrote:
>>
>>> Excellent. You filled a missing link.
>>>
>>> Best,
>>> Passion
>>>
>>> On Wed, Mar 21, 2018 at 11:36 PM, Rohit Karlupia 
>>> wrote:
>>>
 Hi,

 Happy to announce the availability of Sparklens as open source project.
 It helps in understanding the  scalability limits of spark applications and
 can be a useful guide on the path towards tuning applications for lower
 runtime or cost.

 Please clone from here: https://github.com/qubole/sparklens
 Old blogpost: https://www.qubole.com/blog/introducing-quboles-sp
 ark-tuning-tool/

 thanks,
 rohitk

 PS: Thanks for the patience. It took couple of months to get back on
 this.





>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


Re: Need config params while doing rdd.foreach or map

2018-03-22 Thread ayan guha
Spark context runs in driver whereas the func inside foreach runs in
executor. You can pass on the param in the func so it is available in
executor

On Thu, 22 Mar 2018 at 8:18 pm, Kamalanathan Venkatesan <
kamalanatha...@in.ey.com> wrote:

> Hello All,
>
>
>
> I have custom parameter say for example file name added to the conf of
> spark context example  SparkConf.set(INPUT_FILE_NAME, fileName).
>
> I need this value inside foreach performed on an  RDD, but the when access
> spark context inside foreach, I receive spark context is null exception!
>
>
>
> Code sample:
>
>
>
> *val* conf = *new* SparkConf().setMaster(appConfig.envOrElseConfig(
> "app.sparkconf.master"))
>
>   .setAppName(appConfig.envOrElseConfig("app.appName"))
>
>   .set(“INPUT_FILE_NAME”, fileName)
>
>
>
> var sparkContext = *new* SparkContext(conf)
>
>
>
> sparkContext.addJar(sparkContextParams.jarPath)
>
>
>
> var sqlContext = *new* SQLContext(sparkContext)
>
>
>
> *var* df = sqlContext.read.format("com.databricks.spark.csv")
>
>  .option("header", "true")
>
>   .load()
>
>
>
> df.foreach( f=> {
>
>f.split(“,”)
>
> *   println(sparkContext.getConf.get(“INPUT_FILE_NAME”))*
>
> });
>
>
>
> The above *sparkContext.getConf.get(“INPUT_FILE_NAME”) throws null
> pointer exception!*
>
>
>
> Thanks,
>
> Kamal.
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>
-- 
Best Regards,
Ayan Guha


Need config params while doing rdd.foreach or map

2018-03-22 Thread Kamalanathan Venkatesan
Hello All,

I have custom parameter say for example file name added to the conf of spark 
context example  SparkConf.set(INPUT_FILE_NAME, fileName).
I need this value inside foreach performed on an  RDD, but the when access 
spark context inside foreach, I receive spark context is null exception!

Code sample:

val conf = new 
SparkConf().setMaster(appConfig.envOrElseConfig("app.sparkconf.master"))
  .setAppName(appConfig.envOrElseConfig("app.appName"))
  .set("INPUT_FILE_NAME", fileName)

var sparkContext = new SparkContext(conf)

sparkContext.addJar(sparkContextParams.jarPath)

var sqlContext = new SQLContext(sparkContext)

var df = sqlContext.read.format("com.databricks.spark.csv")
 .option("header", "true")
  .load()

df.foreach( f=> {
   f.split(",")
   println(sparkContext.getConf.get("INPUT_FILE_NAME"))
});

The above sparkContext.getConf.get("INPUT_FILE_NAME") throws null pointer 
exception!

Thanks,
Kamal.

The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you have received 
this communication in error, please notify us immediately by responding to this 
email and then delete it from your system. The firm is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


Re: Is there a mutable dataframe spark structured streaming 2.3.0?

2018-03-22 Thread Jorge Machado
DataFrames are not mutable. 

Jorge Machado


> On 22 Mar 2018, at 10:07, Aakash Basu  wrote:
> 
> Hey,
> 
> I faced the same issue a couple of days back, kindly go through the mail 
> chain with "Multiple Kafka Spark Streaming Dataframe Join query" as subject, 
> TD and Chris has cleared my doubts, it would help you too.
> 
> Thanks,
> Aakash.
> 
> On Thu, Mar 22, 2018 at 7:50 AM, kant kodali  > wrote:
> Hi All,
> 
> Is there a mutable dataframe spark structured streaming 2.3.0? I am currently 
> reading from Kafka and if I cannot parse the messages that I get from Kafka I 
> want to write them to say some "dead_queue" topic.  
> 
> I wonder what is the best way to do this?
> 
> Thanks!
> 



Re: Is there a mutable dataframe spark structured streaming 2.3.0?

2018-03-22 Thread Aakash Basu
Hey,

I faced the same issue a couple of days back, kindly go through the mail
chain with "*Multiple Kafka Spark Streaming Dataframe Join query*" as
subject, TD and Chris has cleared my doubts, it would help you too.

Thanks,
Aakash.

On Thu, Mar 22, 2018 at 7:50 AM, kant kodali  wrote:

> Hi All,
>
> Is there a mutable dataframe spark structured streaming 2.3.0? I am
> currently reading from Kafka and if I cannot parse the messages that I get
> from Kafka I want to write them to say some "dead_queue" topic.
>
> I wonder what is the best way to do this?
>
> Thanks!
>


Re: Spark Druid Ingestion

2018-03-22 Thread nayan sharma
Hey Jorge,

Thanks for responding.

Can you elaborate on the user permission part ? HDFS or local ?

As of now, hdfs path -> 
hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
 

 already has complete access for yarn user and my job is also running from the 
same user.


Thanks,
Nayan


> On Mar 22, 2018, at 12:54 PM, Jorge Machado  wrote:
> 
> Seems to me permissions problems  ! Can you check your user / folder 
> permissions ? 
> 
> Jorge Machado
> 
> 
> 
> 
> 
>> On 22 Mar 2018, at 08:21, nayan sharma > > wrote:
>> 
>> Hi All,
>> As druid uses Hadoop MapReduce to ingest batch data but I am trying spark 
>> for ingesting data into druid taking reference from 
>> https://github.com/metamx/druid-spark-batch 
>> 
>> But we are stuck at the following error.
>> Application Log:—>
>> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 
>> MB memory including 384 MB overhead
>> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Setting up container launch context 
>> for our AM
>> 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Setting up the launch environment for 
>> our AM container
>> 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Preparing resources for our AM 
>> container
>> 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor 
>> spark.yarn.archive is set, falling back to uploading libraries under 
>> SPARK_HOME.
>> 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Uploading resource 
>> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip
>>  -> 
>> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
>>  
>> 
>> 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Uploading resource 
>> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip
>>  -> 
>> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip
>>  
>> 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing view acls to: yarn
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing modify acls to: yarn
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing view acls groups to: 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing modify acls groups to: 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - SecurityManager: authentication disabled; 
>> ui acls disabled; users  with view permissions: Set(yarn); groups with view 
>> permissions: Set(); users  with modify permissions: Set(yarn); groups with 
>> modify permissions: Set()
>> 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Submitting application 
>> application_1521457397747_0013 to ResourceManager
>> 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
>> application application_1521457397747_0013
>> 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] 
>> org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting 
>> Yarn extension services with app application_1521457397747_0013 and 
>> attemptId None
>> 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Application report for 
>> application_1521457397747_0013 (state: FAILED)
>> 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - 
>>   client token: N/A
>>   diagnostics: Application application_1521457397747_0013 failed 2 times 
>> due to AM Container for appattempt_1521457397747_0013_02 exited with  
>> exitCode: -1000
>> For more detailed output, check the application tracking page: 
>> http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 
>> 
>>  Then click on links to logs of each attempt.
>> Diagnostics: No such file or directory
>> ENOENT: No such file or directory
>>  at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method)
>>  at 

Spark Druid Ingestion

2018-03-22 Thread nayan sharma
Hi All,As druid uses Hadoop MapReduce to ingest batch data but I am trying spark for ingesting data into druid taking reference from https://github.com/metamx/druid-spark-batchBut we are stuck at the following error.Application Log:—>2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 MB memory including 384 MB overhead2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up container launch context for our AM
2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up the launch environment for our AM container
2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Preparing resources for our AM container
2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls to: yarn
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls to: yarn
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls groups to: 
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn); groups with view permissions: Set(); users  with modify permissions: Set(yarn); groups with modify permissions: Set()
2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Submitting application application_1521457397747_0013 to ResourceManager
2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1521457397747_0013
2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting Yarn extension services with app application_1521457397747_0013 and attemptId None
2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Application report for application_1521457397747_0013 (state: FAILED)
2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - 
	 client token: N/A
	 diagnostics: Application application_1521457397747_0013 failed 2 times due to AM Container for appattempt_1521457397747_0013_02 exited with  exitCode: -1000
For more detailed output, check the application tracking page: http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 Then click on links to logs of each attempt.
Diagnostics: No such file or directory
ENOENT: No such file or directory
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method)	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmod(NativeIO.java:230)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:756)
	at org.apache.hadoop.fs.DelegateToFileSystem.setPermission(DelegateToFileSystem.java:211)
	at org.apache.hadoop.fs.FilterFs.setPermission(FilterFs.java:252)
	at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:1003)
	at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:999)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.setPermission(FileContext.java:1006)
	at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:421)
	at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:419)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.yarn.util.FSDownload.changePermissions(FSDownload.java:419)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:365)
	at 

Re: Spark Druid Ingestion

2018-03-22 Thread Jorge Machado
Seems to me permissions problems  ! Can you check your user / folder 
permissions ? 

Jorge Machado





> On 22 Mar 2018, at 08:21, nayan sharma  wrote:
> 
> Hi All,
> As druid uses Hadoop MapReduce to ingest batch data but I am trying spark for 
> ingesting data into druid taking reference from 
> https://github.com/metamx/druid-spark-batch 
> 
> But we are stuck at the following error.
> Application Log:—>
> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 MB 
> memory including 384 MB overhead
> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Setting up container launch context for 
> our AM
> 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Setting up the launch environment for 
> our AM container
> 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Preparing resources for our AM container
> 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor 
> spark.yarn.archive is set, falling back to uploading libraries under 
> SPARK_HOME.
> 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Uploading resource 
> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip
>  -> 
> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
>  
> 
> 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Uploading resource 
> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip
>  -> 
> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip
>  
> 
> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
> org.apache.spark.SecurityManager - Changing view acls to: yarn
> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
> org.apache.spark.SecurityManager - Changing modify acls to: yarn
> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
> org.apache.spark.SecurityManager - Changing view acls groups to: 
> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
> org.apache.spark.SecurityManager - Changing modify acls groups to: 
> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
> org.apache.spark.SecurityManager - SecurityManager: authentication disabled; 
> ui acls disabled; users  with view permissions: Set(yarn); groups with view 
> permissions: Set(); users  with modify permissions: Set(yarn); groups with 
> modify permissions: Set()
> 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Submitting application 
> application_1521457397747_0013 to ResourceManager
> 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1521457397747_0013
> 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] 
> org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting Yarn 
> extension services with app application_1521457397747_0013 and attemptId None
> 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - Application report for 
> application_1521457397747_0013 (state: FAILED)
> 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] 
> org.apache.spark.deploy.yarn.Client - 
>client token: N/A
>diagnostics: Application application_1521457397747_0013 failed 2 times 
> due to AM Container for appattempt_1521457397747_0013_02 exited with  
> exitCode: -1000
> For more detailed output, check the application tracking page: 
> http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 
>  
> Then click on links to logs of each attempt.
> Diagnostics: No such file or directory
> ENOENT: No such file or directory
>   at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method)
>   at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmod(NativeIO.java:230)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:756)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.setPermission(DelegateToFileSystem.java:211)
>   at org.apache.hadoop.fs.FilterFs.setPermission(FilterFs.java:252)
>   at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:1003)
>   at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:999)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at