Apache Spark - How to concert DataFrame json string to structured element and using schema_of_json

2022-09-05 Thread M Singh
Hi:
In apache spark we can read json using the following:
spark.read.json("path").
There is support to convert json string in a dataframe into structured element 
using 
(https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.types.DataType-scala.collection.immutable.Map-)
from_json(, schema).
However, is there anyway to convert the row into structured element without the 
schema ?

Also, there is support for getting schema of a json string using schema_of_json
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#schema_of_json-org.apache.spark.sql.Column-


Is there a way to convert the result into StructType ?
Thanks

Re: Apache Kafka / Spark Integration - Exception - The server disconnected before a response was received.

2018-04-10 Thread M Singh
 
Hi Daniel:
Yes I am working with Spark Structured Streaming. 
The exception is emanating from spark kafka connector but I was wondering if 
someone has encountered this issue and resolved it by some configuration 
parameter in kafka client/broker or OS settings.
Thanks
Mans
On Tuesday, April 10, 2018, 7:49:42 AM PDT, Daniel Hinojosa 
<dhinoj...@evolutionnext.com> wrote:  
 
 This looks more like a spark issue than it does a Kafka judging by the
stack trace, are you using Spark structured streaming with Kafka
integration by chance?

On Mon, Apr 9, 2018 at 8:47 AM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Hi Folks:
> Just wanted to see if anyone has any suggestions on this issue.
> Thanks
>
>
>    On Monday, March 26, 2018, 11:04:02 AM PDT, M Singh
> <mans2si...@yahoo.com.INVALID> wrote:
>
>  Hi Ted:
> Here is the exception trace (Note - The exception is occuring in the kafka
> spark writer class).
>
> I will try to check broker logs.  Is there anything specific I should look
> for ?
> Driver stacktrace:
>  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1708)
>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1696)
>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1695)
>  at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1695)
>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>  at scala.Option.foreach(Option.scala:257)
>  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
>  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1923)
>  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1878)
>  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1867)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
>  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:926)
>  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:924)
>  at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>  at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.
> apply$mcV$sp(KafkaWriter.scala:89)
>  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.
> apply(KafkaWriter.scala:89)
>  at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.
> apply(KafkaWriter.scala:89)
>  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:65)
>  at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88)
>  at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
>
>
>    On Monday, March 26, 2018, 9:34:06 AM PDT, Ted Yu <yuzhih...@gmail.com>
> wrote:
>
>  Can you post the stack trace for NetworkException (pastebin) ?
>
> Please also check the broker logs to see if there was some clue around the
> time this happened.
>
> Thanks
>
> On Mon, Mar 26, 2018 at 9:30 AM, M Singh <mans2si...@yahoo.com.invalid>
> wrote:
>
> > Hi:
> > I am working with spark 2.2.1 and spark kafka 0.10 client integration
> with
> > Kafka brokers using 0.11.
> > I get the exception - org.apache.kafka.common.errors.NetworkException:
> > The server disconnected before a response was received - when the
> > application is trying to write to a topic. This exception kills the spark
> > application.
> > Based on some similar issues I saw on the web I've added the following
> > kafka configuration but it has not helped.
> > acks = 0
> > request.timeout.ms = 45000
> > receive.buffer.bytes = 1024000
> > I've posted this question to apache spark users list but have not
> received
> > any response.  If anyone has any suggestion/pointers, please let me know.
> > Thanks
> >
>
>
  

Apache Spark - Structured Streaming State Management With Watermark

2018-03-28 Thread M Singh
Hi:
I am using Apache Spark Structured Streaming (2.2.1) to implement custom 
sessionization for events.  The processing is in two steps:1. 
flatMapGroupsWithState (based on user id) - which stores the state of user and 
emits events every minute until a expire event is received 
2. The next step is a aggregation (group by count)

I am using outputMode - Update.

I have a few questions:
1. If I don't use watermark at all -      (a) is the state for 
flatMapGroupsWithState state stored forever ?      (b) is the state for groupBy 
count stored for ever ?2. Is watermark applicable for cleaning up groupBy 
aggregates only ?3. Can we use watermark to manage state in by 
flatMapGroupsWithState ? If so, how ?
4. Can watermark be used for other state clean up - are there any examples for 
those ?
Thanks


Apache Spark - Structured Streaming StreamExecution Stats Description

2018-03-28 Thread M Singh
Hi:
I am using spark structured streaming 2.2.1 and am using flatMapGroupWithState 
and a groupBy count operators.
 In the StreamExecution logs I see two enteries for stateOperators
"stateOperators" : [ {
    "numRowsTotal" : 1617339,
    "numRowsUpdated" : 9647
  }, {
    "numRowsTotal" : 1326355,
    "numRowsUpdated" : 1398672
  } ],
My questions are:1. Is there way to figure out which stats is for 
flatMapGroupWithState and which one for groupBy count ?  In my case, I can 
guess based on my data but want to be definitive about it.2. For the second 
stats - how can the numRowsTotal (1326355) be less than numRowsUpdated 
(1398672) ?
If there in documentation I can use to understand the debug output, please let 
me know.

Thanks


Apache Spark Structured Streaming - How to keep executor alive.

2018-03-23 Thread M Singh
Hi:
I am working on spark structured streaming (2.2.1) with kafka and want 100 
executors to be alive. I set spark.executor.instances to be 100.  The process 
starts running with 100 executors but after some time only a few remain which 
causes backlog of events from kafka.  
I thought I saw a setting to keep the executors from being killed.  However, I 
am not able to find that configuration in spark docs.  If anyone knows that 
setting, please let me know.
Thanks


Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-03-22 Thread M Singh
Hi:
I am working on a realtime application using spark structured streaming (v 
2.2.1). The application reads data from kafka and if there is a failure, I 
would like to ignore the checkpoint.  Is there any configuration to just read 
from last kafka offset after a failure and ignore any offset checkpoints ? 
Also, I believe that the checkpoint also saves state and will continue to 
aggregations after recovery.  Is there any way to ignore checkpointed state ?
Also, is there a way to selectively save state or offset checkpoint only ?

Thanks


Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception

2018-03-22 Thread M Singh
Hi:
I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last 
few days, after running the application for 30-60 minutes get exception from 
Kafka Consumer included below.

The structured streaming application is processing 1 minute worth of data from 
kafka topic. So I've tried increasing request.timeout.ms from 4 seconds 
default to 45000 seconds and receive.buffer.bytes to 1mb but still get the same 
exception.
Is there any spark/kafka configuration that can save the offset and retry it 
next time rather than throwing an exception and killing the application.
I've tried googling but have not found substantial solution/recommendation.  If 
anyone has any suggestions or a different version etc, please let me know.
Thanks
Here is the exception stack trace.

java.util.concurrent.TimeoutException: Cannot fetch record for offset  
in 12 millisecondsat 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
 at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
 at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
 at 


Re: Apache Spark - Structured Streaming reading from Kafka some tasks take much longer

2018-02-24 Thread M Singh
Hi Vijay:
I am using spark-shell because I am still prototyping the steps involved.
Regarding executors - I have 280 executors and UI only show a few straggler 
tasks on each trigger.  The UI does not show too much time spend on GC.  
suspect the delay is because of getting data from kafka. The number of 
straggler is generally less than 5 out 240 but sometimes is higher. 

I will try to dig more into it and see if changing partitions etc helps but was 
wondering if anyone else has encountered similar stragglers holding up 
processing of a window trigger.
Thanks
 

On Friday, February 23, 2018 6:07 PM, vijay.bvp  wrote:
 

 Instead of spark-shell have you tried running it as a job. 

how many executors and cores, can you share the RDD graph and event timeline
on the UI and did you find which of  the tasks taking more time was they are
any GC 

please look at the UI if not already it can provide lot of information



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



   

Apache Spark - Structured Streaming reading from Kafka some tasks take much longer

2018-02-23 Thread M Singh
Hi:
I am working with spark structured streaming (2.2.1) reading data from Kafka 
(0.11).  

I need to aggregate data ingested every minute and I am using spark-shell at 
the moment.  The message rate ingestion rate is approx 500k/second.  During 
some trigger intervals (1 minute) especially when the streaming process is 
started, all tasks finish in 20seconds but during some triggers, it takes 90 
seconds.  

I have tried to reduce the number of partitions approx (100 from 300) to reduce 
the consumers for Kafka, but that has not helped. I also tried the 
kafkaConsumer.pollTimeoutMs to 30 seconds but then I see a lot of 
java.util.concurrent.TimeoutException: Cannot fetch record for offset.
So I wanted to see if anyone has any thoughts/recommendations.
Thanks




Re: Apache Spark - Structured Streaming Query Status - field descriptions

2018-02-11 Thread M Singh
Thanks Richard.  I am hoping that Spark team will at some time, provide more 
detailed documentation.
 

On Sunday, February 11, 2018 2:17 AM, Richard Qiao 
<richardqiao2...@gmail.com> wrote:
 

 Can find a good source for documents, but the source code 
“org.apache.spark.sql.execution.streaming.ProgressReporter” is helpful to 
answer some of them.
For example:  inputRowsPerSecond = numRecords / inputTimeSec,  
processedRowsPerSecond = numRecords / processingTimeSecThis is explaining why 
the 2 rowPerSec difference.

On Feb 10, 2018, at 8:42 PM, M Singh <mans2si...@yahoo.com.INVALID> wrote:
Hi:
I am working with spark 2.2.0 and am looking at the query status console 
output.  

My application reads from kafka - performs flatMapGroupsWithState and then 
aggregates the elements for two group counts.  The output is send to console 
sink.  I see the following output  (with my questions in bold). 

Please me know where I can find detailed description of the query status fields 
for spark structured streaming ?


StreamExecution: Streaming query made progress: {
  "id" : "8eff62a9-81a8-4142-b332-3e5ec63e06a2",
  "runId" : "21778fbb-406c-4c65-bdef-d9d2c24698ce",
  "name" : null,
  "timestamp" : "2018-02-11T01:18:00.005Z",
  "numInputRows" : 5780,
  "inputRowsPerSecond" : 96.32851690748795,    
  "processedRowsPerSecond" : 583.9563548191554,   // Why is the number of 
processedRowsPerSecond greater than inputRowsPerSecond ? Does this include 
shuffling/grouping ?
  "durationMs" : {
    "addBatch" : 9765,    // Is 
the time taken to get send output to all console output streams ? 
    "getBatch" : 3,   
// Is this time taken to get the batch from Kafka ?
    "getOffset" : 3,   
// Is this time for getting offset from Kafka ?
    "queryPlanning" : 89, // 
The value of this field changes with different triggers but the query is not 
changing so why does this change ?
    "triggerExecution" : 9898, // Is 
this total time for this trigger ?
    "walCommit" : 35 // Is 
this for checkpointing ?
  },
  "stateOperators" : [ {   // 
What are the two state operators ? I am assuming one is flatMapWthState (first 
one).
    "numRowsTotal" : 8,
    "numRowsUpdated" : 1
  }, {
    "numRowsTotal" : 6,    //Is 
this the group by state operator ?  If so, I have two group by so why do I see 
only one ?
    "numRowsUpdated" : 6
  } ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[xyz]]",
    "startOffset" : {
  "xyz" : {
    "2" : 9183,
    "1" : 9184,
    "3" : 9184,
    "0" : 9183
  }
    },
    "endOffset" : {
  "xyz" : {
    "2" : 10628,
    "1" : 10629,
    "3" : 10629,
    "0" : 10628
  }
    },
    "numInputRows" : 5780,
    "inputRowsPerSecond" : 96.32851690748795,
    "processedRowsPerSecond" : 583.9563548191554
  } ],
  "sink" : {
    "description" : 
"org.apache.spark.sql.execution.streaming.ConsoleSink@15fc109c"
  }
}






   

Re: Apache Spark - Structured Streaming - Updating UDF state dynamically at run time

2018-02-10 Thread M Singh
Just checking if anyone has any pointers for dynamically updating query state 
in structured streaming.
Thanks
 

On Thursday, February 8, 2018 2:58 PM, M Singh 
<mans2si...@yahoo.com.INVALID> wrote:
 

 Hi Spark Experts:
I am trying to use a stateful udf with spark structured streaming that needs to 
update the state periodically.
Here is the scenario:
1. I have a udf with a variable with default value (eg: 1)  This value is 
applied to a column (eg: subtract the variable from the column value )2. The 
variable is to be updated periodically asynchronously (eg: reading a file every 
5 minutes) and the new rows will have the new value applied to the column value.
Spark natively supports broadcast variables, but I could not find a way to 
update the broadcasted variables dynamically or rebroadcast them once so that 
the udf internal state can be updated while the structure streaming application 
is running.
I can try to read the variable from the file on each invocation of the udf but 
it will not scale since each invocation open/read/close the file.
Please let me know if there is any documentation/example to support this 
scenario.
Thanks





   

Apache Spark - Structured Streaming Query Status - field descriptions

2018-02-10 Thread M Singh
Hi:
I am working with spark 2.2.0 and am looking at the query status console 
output.  

My application reads from kafka - performs flatMapGroupsWithState and then 
aggregates the elements for two group counts.  The output is send to console 
sink.  I see the following output  (with my questions in bold). 

Please me know where I can find detailed description of the query status fields 
for spark structured streaming ?


StreamExecution: Streaming query made progress: {
  "id" : "8eff62a9-81a8-4142-b332-3e5ec63e06a2",
  "runId" : "21778fbb-406c-4c65-bdef-d9d2c24698ce",
  "name" : null,
  "timestamp" : "2018-02-11T01:18:00.005Z",
  "numInputRows" : 5780,
  "inputRowsPerSecond" : 96.32851690748795,    
  "processedRowsPerSecond" : 583.9563548191554,   // Why is the number of 
processedRowsPerSecond greater than inputRowsPerSecond ? Does this include 
shuffling/grouping ?
  "durationMs" : {
    "addBatch" : 9765,    // Is 
the time taken to get send output to all console output streams ? 
    "getBatch" : 3,   
// Is this time taken to get the batch from Kafka ?
    "getOffset" : 3,   
// Is this time for getting offset from Kafka ?
    "queryPlanning" : 89, // 
The value of this field changes with different triggers but the query is not 
changing so why does this change ?
    "triggerExecution" : 9898, // Is 
this total time for this trigger ?
    "walCommit" : 35 // Is 
this for checkpointing ?
  },
  "stateOperators" : [ {   // 
What are the two state operators ? I am assuming one is flatMapWthState (first 
one).
    "numRowsTotal" : 8,
    "numRowsUpdated" : 1
  }, {
    "numRowsTotal" : 6,    //Is 
this the group by state operator ?  If so, I have two group by so why do I see 
only one ?
    "numRowsUpdated" : 6
  } ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[xyz]]",
    "startOffset" : {
  "xyz" : {
    "2" : 9183,
    "1" : 9184,
    "3" : 9184,
    "0" : 9183
  }
    },
    "endOffset" : {
  "xyz" : {
    "2" : 10628,
    "1" : 10629,
    "3" : 10629,
    "0" : 10628
  }
    },
    "numInputRows" : 5780,
    "inputRowsPerSecond" : 96.32851690748795,
    "processedRowsPerSecond" : 583.9563548191554
  } ],
  "sink" : {
    "description" : 
"org.apache.spark.sql.execution.streaming.ConsoleSink@15fc109c"
  }
}




Apache Spark - Structured Streaming - Updating UDF state dynamically at run time

2018-02-08 Thread M Singh
Hi Spark Experts:
I am trying to use a stateful udf with spark structured streaming that needs to 
update the state periodically.
Here is the scenario:
1. I have a udf with a variable with default value (eg: 1)  This value is 
applied to a column (eg: subtract the variable from the column value )2. The 
variable is to be updated periodically asynchronously (eg: reading a file every 
5 minutes) and the new rows will have the new value applied to the column value.
Spark natively supports broadcast variables, but I could not find a way to 
update the broadcasted variables dynamically or rebroadcast them once so that 
the udf internal state can be updated while the structure streaming application 
is running.
I can try to read the variable from the file on each invocation of the udf but 
it will not scale since each invocation open/read/close the file.
Please let me know if there is any documentation/example to support this 
scenario.
Thanks





Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-02-06 Thread M Singh
Hi Jacek:
Thanks for your response.
I am just trying to understand the fundamentals of watermarking and how it 
behaves in aggregation vs non-aggregation scenarios.



 

On Tuesday, February 6, 2018 9:04 AM, Jacek Laskowski <ja...@japila.pl> 
wrote:
 

 Hi,
What would you expect? The data is simply dropped as that's the purpose of 
watermarking it. That's my understanding at least.
Pozdrawiam,Jacek Laskowskihttps://about.me/JacekLaskowskiMastering Spark 
SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Mon, Feb 5, 2018 at 8:11 PM, M Singh <mans2si...@yahoo.com> wrote:

Just checking if anyone has more details on how watermark works in cases where 
event time is earlier than processing time stamp. 

On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote:
 

 Hi Vishu/Jacek:
Thanks for your responses.
Jacek - At the moment, the current time for my use case is processing time.

Vishnu - Spark documentation (https://spark.apache.org/ docs/latest/structured- 
streaming-programming-guide. html) does indicate that it can dedup using 
watermark.  So I believe there are more use cases for watermark and that is 
what I am trying to find.
I am hoping that TD can clarify or point me to the documentation.
Thanks
 

On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath 
<vishnu.viswanat...@gmail.com> wrote:
 

 Hi Mans,
Watermark is Spark is used to decide when to clear the state, so if the even it 
delayed more than when the state is cleared by Spark, then it will be ignored.I 
recently wrote a blog post on this : http://vishnuviswanath.com/ 
spark_structured_streaming. html#watermark

Yes, this State is applicable for aggregation only. If you are having only a 
map function and don't want to process it, you could do a filter based on its 
EventTime field, but I guess you will have to compare it with the processing 
time since there is no API to access Watermark by the user. 
-Vishnu
On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi:
I am trying to filter out records which are lagging behind (based on event 
time) by a certain amount of time.  
Is the watermark api applicable to this scenario (ie, filtering lagging 
records) or it is only applicable with aggregation ?  I could not get a clear 
understanding from the documentation which only refers to it's usage with 
aggregation.

Thanks
Mans



   

   



   

Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-02-05 Thread M Singh
Just checking if anyone has more details on how watermark works in cases where 
event time is earlier than processing time stamp. 

On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote:
 

 Hi Vishu/Jacek:
Thanks for your responses.
Jacek - At the moment, the current time for my use case is processing time.

Vishnu - Spark documentation 
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
 does indicate that it can dedup using watermark.  So I believe there are more 
use cases for watermark and that is what I am trying to find.
I am hoping that TD can clarify or point me to the documentation.
Thanks
 

On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath 
<vishnu.viswanat...@gmail.com> wrote:
 

 Hi Mans,
Watermark is Spark is used to decide when to clear the state, so if the even it 
delayed more than when the state is cleared by Spark, then it will be ignored.I 
recently wrote a blog post on this : 
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Yes, this State is applicable for aggregation only. If you are having only a 
map function and don't want to process it, you could do a filter based on its 
EventTime field, but I guess you will have to compare it with the processing 
time since there is no API to access Watermark by the user. 
-Vishnu
On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi:
I am trying to filter out records which are lagging behind (based on event 
time) by a certain amount of time.  
Is the watermark api applicable to this scenario (ie, filtering lagging 
records) or it is only applicable with aggregation ?  I could not get a clear 
understanding from the documentation which only refers to it's usage with 
aggregation.

Thanks
Mans



   

   

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-02-05 Thread M Singh
Hi TD:
Just wondering if you have any insight for me or need more info.
Thanks 

On Thursday, February 1, 2018 7:43 AM, M Singh 
<mans2si...@yahoo.com.INVALID> wrote:
 

 Hi TD:
Here is the udpated code with explain and full stack trace.
Please let me know what could be the issue and what to look for in the explain 
output.
Updated code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp().cast("long"))    
dataframe2.explain(true)    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Explain output:
== Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as 
bigint) AS cts#6L]+- AnalysisBarrier      +- Project [id#0, visit#1]         +- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, 
visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, 
visit#1]   +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation FileSource[./data/], [id#0, visit#1]
Here is the exception:
18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = 
a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.st

Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-02-02 Thread M Singh
Hi Vishu/Jacek:
Thanks for your responses.
Jacek - At the moment, the current time for my use case is processing time.

Vishnu - Spark documentation 
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
 does indicate that it can dedup using watermark.  So I believe there are more 
use cases for watermark and that is what I am trying to find.
I am hoping that TD can clarify or point me to the documentation.
Thanks
 

On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath 
<vishnu.viswanat...@gmail.com> wrote:
 

 Hi Mans,
Watermark is Spark is used to decide when to clear the state, so if the even it 
delayed more than when the state is cleared by Spark, then it will be ignored.I 
recently wrote a blog post on this : 
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Yes, this State is applicable for aggregation only. If you are having only a 
map function and don't want to process it, you could do a filter based on its 
EventTime field, but I guess you will have to compare it with the processing 
time since there is no API to access Watermark by the user. 
-Vishnu
On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi:
I am trying to filter out records which are lagging behind (based on event 
time) by a certain amount of time.  
Is the watermark api applicable to this scenario (ie, filtering lagging 
records) or it is only applicable with aggregation ?  I could not get a clear 
understanding from the documentation which only refers to it's usage with 
aggregation.

Thanks
Mans



   

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-02-01 Thread M Singh
n$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Exception
 in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: 
Invalid call to dataType on unresolved object, tree: 'cts=== Streaming Query 
===Identifier: [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae]Current Committed Offsets: {}Current 
Available Offsets: 
{FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data]:
 {"logOffset":0}}
Current State: ACTIVEThread State: RUNNABLE
Logical Plan:Project [id#0, visit#1, cast(current_timestamp() as bigint) AS 
cts#6L]+- Project [id#0, visit#1]   +- StreamingExecutionRelation 
FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data],
 [id#0, visit#1]
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Caused
 by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
 ... 1 more 

On Wednesday, January 31, 2018 3:46 PM, Tathagata Das 
<tathagata.das1...@gmail.com> wrote:
 

 Could you give the full stack trace of the exception?
Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/ docs/latest/structured- streaming-programming-guide. 
html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-str

Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread M Singh
Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-streaming classes and input file with this email)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
Here is the input file (in the ./data directory) - note tokens are separated by 
'\t'
1 v12 v12 v23 v33 v1
Here is the code with dataframe (non-streaming) which works:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.apache.spark.sql._import org.apache.spark.sql.types._
object StructuredTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframe = 
spark.read.option("sep","\t").schema(schema).csv("./data/")    var dataframe2 = 
dataframe.select(expr("*"), current_timestamp().as("cts"))    
dataframe2.show(false)    spark.stop()      }}
Output of the above code is:
+---+-+---+|id |visit|cts                    
|+---+-+---+|1  |v1   |2018-01-31 15:07:00.758||2  |v1  
 |2018-01-31 15:07:00.758||2  |v2   |2018-01-31 15:07:00.758||3  |v3   
|2018-01-31 15:07:00.758||3  |v1   |2018-01-31 
15:07:00.758|+---+-+---+

Here is the code with structured streaming which throws the exception:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp())    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Here is the exception:
18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 
0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
I've also used snippets (shown in bold below) from 
(https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but
 still get the same exception:
Here is the code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select(      
current_timestamp().cast("timestamp").alias("timestamp"),      expr("*"))    
val query = 
dataframe2.writeStream.option("trucate","false").format("console").start
    query.awaitTermination() 

Apache Spark - Spark Structured Streaming - Watermark usage

2018-01-26 Thread M Singh
Hi:
I am trying to filter out records which are lagging behind (based on event 
time) by a certain amount of time.  
Is the watermark api applicable to this scenario (ie, filtering lagging 
records) or it is only applicable with aggregation ?  I could not get a clear 
understanding from the documentation which only refers to it's usage with 
aggregation.

Thanks
Mans

Re: Apache Spark - Custom structured streaming data source

2018-01-26 Thread M Singh
Thanks TD.  When will 2.3 scheduled for release ?   

On Thursday, January 25, 2018 11:32 PM, Tathagata Das <t...@databricks.com> 
wrote:
 

 Hello Mans,
The streaming DataSource APIs are still evolving and are not public yet. Hence 
there is no official documentation. In fact, there is a new DataSourceV2 API 
(in Spark 2.3) that we are migrating towards. So at this point of time, it's 
hard to make any concrete suggestion. You can take a look at the classes 
DataSourceV2, DataReader, MicroBatchDataReader in the spark source code, along 
with their implementations.
Hope this helps. 
TD

On Jan 25, 2018 8:36 PM, "M Singh" <mans2si...@yahoo.com.invalid> wrote:

Hi:
I am trying to create a custom structured streaming source and would like to 
know if there is any example or documentation on the steps involved.
I've looked at the some methods available in the SparkSession but these are 
internal to the sql package:
  private[sql] def internalCreateDataFrame(      catalystRows: 
RDD[InternalRow],      schema: StructType,      isStreaming: Boolean = false): 
DataFrame = {    // TODO: use MutableProjection when rowRDD is another 
DataFrame and the applied    // schema differs from the existing schema on any 
field data type.    val logicalPlan = LogicalRDD(      schema.toAttributes,     
 catalystRows,      isStreaming = isStreaming)(self)    Dataset.ofRows(self, 
logicalPlan)  } 
Please let me know where I can find the appropriate API or documentation.
Thanks
Mans



   

Apache Spark - Custom structured streaming data source

2018-01-25 Thread M Singh
Hi:
I am trying to create a custom structured streaming source and would like to 
know if there is any example or documentation on the steps involved.
I've looked at the some methods available in the SparkSession but these are 
internal to the sql package:
  private[sql] def internalCreateDataFrame(      catalystRows: 
RDD[InternalRow],      schema: StructType,      isStreaming: Boolean = false): 
DataFrame = {    // TODO: use MutableProjection when rowRDD is another 
DataFrame and the applied    // schema differs from the existing schema on any 
field data type.    val logicalPlan = LogicalRDD(      schema.toAttributes,     
 catalystRows,      isStreaming = isStreaming)(self)    Dataset.ofRows(self, 
logicalPlan)  } 
Please let me know where I can find the appropriate API or documentation.
Thanks
Mans

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-05 Thread M Singh
Hi Jacek:

The javadoc mentions that we can only consume data from the data frame in the 
addBatch method.  So, if I would like to save the data to a new sink then I 
believe that I will need to collect the data and then save it.  This is the 
reason I am asking about how to control the size of the data in each invocation 
of the addBatch method.  Let me know if I am interpreting the javadoc 
incorrectly.  Here it is:
/**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit


Thanks
Mans  

On Thursday, January 4, 2018 2:19 PM, Jacek Laskowski <ja...@japila.pl> 
wrote:
 

 Hi,
> If the data is very large then a collect may result in OOM.
That's a general case even in any part of Spark, incl. Spark Structured 
Streaming. Why would you collect in addBatch? It's on the driver side and as 
anything on the driver, it's a single JVM (and usually not fault tolerant)
> Do you have any other suggestion/recommendation ?
What's wrong with the current solution? I don't think you should change how you 
do things currently. You should just avoid collect on large datasets (which you 
have to do anywhere in Spark).
Pozdrawiam,Jacek Laskowskihttps://about.me/JacekLaskowskiMastering Spark 
SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc 
indicate you can use foreach or collect on the dataframe.  If the data is very 
large then a collect may result in OOM.
>From your answer it appears that the only way to control the size (in 2.2) 
>would be control the trigger interval. However, in my case, I have to dedup 
>the elements in one minute interval, which I am using a trigger interval and 
>cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

On Wednesday, January 3, 2018 2:27 PM, Tathagata Das 
<tathagata.das1...@gmail.com> wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame 
which is a purely logical representation of data and has no association with 
partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then 
you should either control the trigger interval or use the rate limit options on 
sources that support it (e.g. for kafka, you can use the option 
"maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 
2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/ 
continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks



   



   

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-04 Thread M Singh
Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc 
indicate you can use foreach or collect on the dataframe.  If the data is very 
large then a collect may result in OOM.
>From your answer it appears that the only way to control the size (in 2.2) 
>would be control the trigger interval. However, in my case, I have to dedup 
>the elements in one minute interval, which I am using a trigger interval and 
>cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

On Wednesday, January 3, 2018 2:27 PM, Tathagata Das 
<tathagata.das1...@gmail.com> wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame 
which is a purely logical representation of data and has no association with 
partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then 
you should either control the trigger interval or use the rate limit options on 
sources that support it (e.g. for kafka, you can use the option 
"maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 
2.3, we are adding a DataSource V2 API for 
batch/microbatch-streaming/continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks



   

Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread M Singh
Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks

Re: Spark on EMR suddenly stalling

2018-01-01 Thread M Singh
Hi Jeroen:
I am not sure if I missed it - but can you let us know what is your input 
source and output sink ?  
In some cases, I found that saving to S3 was a problem. In this case I started 
saving the output to the EMR HDFS and later copied to S3 using s3-dist-cp which 
solved our issue.

Mans 

On Monday, January 1, 2018 7:41 AM, Rohit Karlupia  
wrote:
 

 Here is the list that I will probably try to fill:   
   - Check GC on the offending executor when the task is running. May be you 
need even more memory.  
   - Go back to some previous successful run of the job and check the spark ui 
for the offending stage and check max task time/max input/max shuffle in/out 
for the largest task. Will help you understand the degree of skew in this 
stage. 
   - Take a thread dump of the executor from the Spark UI and verify if the 
task is really doing any work or it stuck in some deadlock. Some of the hive 
serde are not really usable from multi-threaded/multi-use spark executors. 
   - Take a thread dump of the executor from the Spark UI and verify if the 
task is spilling to disk. Playing with storage and memory fraction or generally 
increasing the memory will help. 
   - Check the disk utilisation on the machine running the executor. 
   - Look for event loss messages in the logs due to event queue full. Loss of 
events can send some of the spark components into really bad states.  

thanks,rohitk


On Sun, Dec 31, 2017 at 12:50 AM, Gourav Sengupta  
wrote:

Hi,
Please try to use the SPARK UI from the way that AWS EMR recommends, it should 
be available from the resource manager. I never ever had any problem working 
with it. THAT HAS ALWAYS BEEN MY PRIMARY AND SOLE SOURCE OF DEBUGGING.
Sadly, I cannot be of much help unless we go for a screen share session over 
google chat or skype. 
Also, I ALWAYS prefer the maximize Resource Allocation setting in EMR to be set 
to true. 
Besides that, there is a metrics in the EMR console which shows the number of 
containers getting generated by your job on graphs.


Regards,Gourav Sengupta
On Fri, Dec 29, 2017 at 6:23 PM, Jeroen Miller  wrote:

Hello,

Just a quick update as I did not made much progress yet.

On 28 Dec 2017, at 21:09, Gourav Sengupta  wrote:
> can you try to then use the EMR version 5.10 instead or EMR version 5.11 
> instead?

Same issue with EMR 5.11.0. Task 0 in one stage never finishes.

> can you please try selecting a subnet which is in a different availability 
> zone?

I did not try this yet. But why should that make a difference?

> if possible just try to increase the number of task instances and see the 
> difference?

I tried with 512 partitions -- no difference.

> also in case you are using caching,

No caching used.

> Also can you please report the number of containers that your job is creating 
> by looking at the metrics in the EMR console?

8 containers if I trust the directories in j-xxx/containers/application_x xx/.

> Also if you see the spark UI then you can easily see which particular step is 
> taking the longest period of time - you just have to drill in a bit in order 
> to see that. Generally in case shuffling is an issue then it definitely 
> appears in the SPARK UI as I drill into the steps and see which particular 
> one is taking the longest.

I always have issues with the Spark UI on EC2 -- it never seems to be up to 
date.

JM







   

Apache Spark - Using withWatermark for DataSets

2017-12-30 Thread M Singh
Hi:
I am working with DataSets so that I can use mapGroupsWithState for business 
logic and then use dropDuplicates over a set of fields.  I would like to use 
the withWatermark so that I can restrict the how much state is stored.  
>From the API it looks like withWatermark takes a string - timestamp column 
>name as argument.  Is it possible to use it with DataSets ?  If not, is there 
>any alternative like withWatermark available for working with DataSets ?
Thanks
Mans

Re: Apache Spark - Structured Streaming graceful shutdown

2017-12-30 Thread M Singh
Thanks Eyal - it appears that these are the same patterns used for spark 
DStreams. 

On Wednesday, December 27, 2017 1:15 AM, Eyal Zituny 
<eyal.zit...@equalum.io> wrote:
 

 Hiif you're interested in stopping you're spark application externally, you 
will probably need a way to communicate with the spark driver  (which start and 
holds a ref to the spark context)this can be done by adding some code to the 
driver app, for example:   
   - you can expose a rest api that stop the query and the spark context   

   - if running in client mode you can listen to stdin   

   - you can also listen to an external system (like kafka)
Eyal
On Tue, Dec 26, 2017 at 10:37 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Thanks Diogo.  My question is how to gracefully call the stop method while the 
streaming application is running in a cluster.

 

On Monday, December 25, 2017 5:39 PM, Diogo Munaro Vieira 
<diogo.mun...@corp.globo.com> wrote:
 

 Hi M Singh! Here I'm using query.stop()
Em 25 de dez de 2017 19:19, "M Singh" <mans2si...@yahoo.com.invalid> escreveu:

Hi:Are there any patterns/recommendations for gracefully stopping a structured 
streaming application ?Thanks





   



   

Re: Apache Spark - Structured Streaming graceful shutdown

2017-12-26 Thread M Singh
Thanks Diogo.  My question is how to gracefully call the stop method while the 
streaming application is running in a cluster.

 

On Monday, December 25, 2017 5:39 PM, Diogo Munaro Vieira 
<diogo.mun...@corp.globo.com> wrote:
 

 Hi M Singh! Here I'm using query.stop()
Em 25 de dez de 2017 19:19, "M Singh" <mans2si...@yahoo.com.invalid> escreveu:

Hi:Are there any patterns/recommendations for gracefully stopping a structured 
streaming application ?Thanks





   

Apache Spark - (2.2.0) - window function for DataSet

2017-12-25 Thread M Singh
Hi:I would like to use window function on a DataSet stream (Spark 2.2.0)The 
window function requires Column as argument and can be used with DataFrames by 
passing the column. Is there any analogous window function or pointers to how 
window function can be used for DataSets ?
Thanks

Apache Spark - Structured Streaming from file - checkpointing

2017-12-25 Thread M Singh
Hi:
I am using spark structured streaming (v 2.2.0) to read data from files. I have 
configured checkpoint location. On stopping and restarting the application, it 
looks like it is reading the previously ingested files.  Is that expected 
behavior ?  
Is there anyway to prevent reading files that have already been ingested ? If a 
file is partially ingested, on restart - can we start reading the file from 
previously checkpointed offset ?
Thanks

Apache Spark - Structured Streaming graceful shutdown

2017-12-25 Thread M Singh
Hi:Are there any patterns/recommendations for gracefully stopping a structured 
streaming application ?Thanks




Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-12 Thread M Singh
Thanks TD.

BTW - If I have input file ~ 250 GBs - Is there any guideline on whether to use:

* a single input (250 GB) (in this case is there any max upper bound) 
or 

* split into 1000 files each of 250 MB (hdfs block size is 250 MB) or 

* a multiple of hdfs block size.
Mans





On Friday, July 11, 2014 4:38 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


The model for file stream is to pick up and process new files written 
atomically (by move) into a directory. So your file is being processed in a 
single batch, and then its waiting for any new files to be written into that 
directory. 

TD



On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote:

So, is it expected for the process to generate stages/tasks even after 
processing a file ?


Also, is there a way to figure out the file that is getting processed and when 
that process is complete ?


Thanks




On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


Whenever you need to do a shuffle=based operation like reduceByKey, 
groupByKey, join, etc., the system is essentially redistributing the data 
across the cluster and it needs to know how many parts should it divide the 
data into. Thats where the default parallelism is used. 


TD



On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

Hi TD:


The input file is on hdfs.  


The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key. 
 After the file has been processed, I see new stages with 2 tasks that 
continue to be generated. I understand this value (2) is the default value 
for spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 


Thanks.



On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

 * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

 * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the 
 file has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any 
insights/pointers for understanding this difference in behavior and how to 
avoid generating tasks/stages when there is no data (new file) available.


Thanks

Mans







Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
Hi TD:

The input file is on hdfs.  

The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
After the file has been processed, I see new stages with 2 tasks that continue 
to be generated. I understand this value (2) is the default value for 
spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 

Thanks.


On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

   * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

   * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the file 
 has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any insights/pointers 
for understanding this difference in behavior and how to avoid generating 
tasks/stages when there is no data (new file) available.


Thanks

Mans

Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
So, is it expected for the process to generate stages/tasks even after 
processing a file ?

Also, is there a way to figure out the file that is getting processed and when 
that process is complete ?

Thanks



On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, 
join, etc., the system is essentially redistributing the data across the 
cluster and it needs to know how many parts should it divide the data into. 
Thats where the default parallelism is used. 

TD



On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

Hi TD:


The input file is on hdfs.  


The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
After the file has been processed, I see new stages with 2 tasks that continue 
to be generated. I understand this value (2) is the default value for 
spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 


Thanks.



On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

  * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

  * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the 
 file has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any 
insights/pointers for understanding this difference in behavior and how to 
avoid generating tasks/stages when there is no data (new file) available.


Thanks

Mans




Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-09 Thread M Singh
Hi Folks:


I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

* Process each row and save the counter in cassandra.  In this scenario 
after the text file has been consumed, there is no task/stages seen in the 
spark UI.

* If instead I use reduce by key before saving to cassandra, the spark 
UI shows continuous generation of tasks/stages even afterprocessing the file 
has been completed.

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any insights/pointers 
for understanding this difference in behavior and how to avoid generating 
tasks/stages when there is no data (new file) available.

Thanks

Mans

Re: Reading text file vs streaming text files

2014-07-08 Thread M Singh
Hi Akhil:

Thanks for your response.

Mans



On Thursday, July 3, 2014 9:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 


Hi Singh!

For this use-case its better to have a Streaming context listening to that 
directory in hdfs where the files are being dropped and you can set the 
Streaming interval as 15 minutes and let this driver program run continuously, 
so as soon as new files are arrived they are taken for processing in every 15 
minutes. In this way, you don't have to worry about the old files unless you 
are about to restart the driver program. Another implementation would be after 
processing of each batch, you can simply move those processed files to another 
directory or so.


Thanks
Best Regards


On Thu, Jul 3, 2014 at 6:34 PM, M Singh mans6si...@yahoo.com wrote:

Hi:


I am working on a project where a few thousand text files (~20M in size) will 
be dropped in an hdfs directory every 15 minutes.  Data from the file will 
used to update counters in cassandra (non-idempotent operation).  I was 
wondering what is the best to deal with this:
   * Use text streaming and process the files as they are added to the 
 directory
   * Use non-streaming text input and launch a spark driver every 15 
 minutes to process files from a specified directory (new directory for every 
 15 minutes).
   * Use message queue to ingest data from the files and then read data 
 from the queue.
Also, is there a way to to find which text file is being processed and when a 
file has been processed for both the streaming and non-streaming RDDs.  I 
believe filename is available in the WholeTextFileInputFormat but is it 
available in standard or streaming text RDDs.


Thanks


Mans 

Re: Java sample for using cassandra-driver-spark

2014-07-08 Thread M Singh
Hi Piotr:

It would be great if we can have an api to support batch updates (counter + 
non-counter).

Thanks

Mans


On Monday, July 7, 2014 11:36 AM, Piotr Kołaczkowski pkola...@datastax.com 
wrote:
 


Hi, we're planning to add a basic Java-API very soon, possibly this week.
There's a ticket for it here: 
https://github.com/datastax/cassandra-driver-spark/issues/11

We're open to any ideas. Just let us know what you need the API to have in the 
comments.

Regards,
Piotr Kołaczkowski



2014-07-05 0:48 GMT+02:00 M Singh mans6si...@yahoo.com:

Hi:


Is there a Java sample fragment for using cassandra-driver-spark ?


Thanks


-- 

Piotr Kolaczkowski, Lead Software Engineer

pkola...@datastax.com


http://www.datastax.com/3975 Freedom Circle
Santa Clara, CA 95054, USA

Re: window analysis with Spark and Spark streaming

2014-07-04 Thread M Singh
Another alternative could be use SparkStreaming's textFileStream with windowing 
capabilities.



On Friday, July 4, 2014 9:52 AM, Gianluca Privitera 
gianluca.privite...@studio.unibo.it wrote:
 


You should think about a custom receiver, in order to solve the problem of the 
“already collected” data.  
http://spark.apache.org/docs/latest/streaming-custom-receivers.html

Gianluca



On 04 Jul 2014, at 15:46, alessandro finamore alessandro.finam...@polito.it 
wrote:

Hi,

I have a large dataset of text logs files on which I need to implement
window analysis
Say, extract per-minute data and do aggregated stats on the last X minutes

I've to implement the windowing analysis with spark.
This is the workflow I'm currently using
- read a file and I create a new RDD with per-minute info
- loop on each new minute and integrate its data with another data structure
containing the last X minutes of data
- apply the analysis on the updated window of data 

This works but it suffer from limited parallelisms
Do you have any recommendations/suggestion about a better implementation?
Also, are there any recommended data collections for managing the window
(I'm simply using Arrays for managing data)

While working in this I started to investigate spark streaming.
The problem is that I don't know if is really possible to use it on already
collected data.
This post seems to indicate that it should, but it is not clear to me how
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-windowing-Driven-by-absolutely-time-td1733.html

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: window analysis with Spark and Spark streaming

2014-07-04 Thread M Singh
The windowing capabilities of spark streaming determine the events in the RDD 
created for that time window.  If the duration is 1s then all the events 
received in a particular 1s window will be a part of the RDD created for that 
window for that stream.



On Friday, July 4, 2014 1:28 PM, alessandro finamore 
alessandro.finam...@polito.it wrote:
 


Thanks for the replies

What is not completely clear to me is how time is managed.
I can create a DStream from file.
But if I set the window property that will be bounded to the application
time, right?

If I got it right, with a receiver I can control the way DStream are
created.
But, how can apply then the windowing already shipped with the framework if
this is bounded to the application time?
I would like to do define a window of N files but the window() function
requires a duration as input...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

Java sample for using cassandra-driver-spark

2014-07-04 Thread M Singh
Hi:

Is there a Java sample fragment for using cassandra-driver-spark ?

Thanks


Reading text file vs streaming text files

2014-07-03 Thread M Singh
Hi:

I am working on a project where a few thousand text files (~20M in size) will 
be dropped in an hdfs directory every 15 minutes.  Data from the file will used 
to update counters in cassandra (non-idempotent operation).  I was wondering 
what is the best to deal with this:
* Use text streaming and process the files as they are added to the 
directory
* Use non-streaming text input and launch a spark driver every 15 
minutes to process files from a specified directory (new directory for every 15 
minutes).
* Use message queue to ingest data from the files and then read data 
from the queue.
Also, is there a way to to find which text file is being processed and when a 
file has been processed for both the streaming and non-streaming RDDs.  I 
believe filename is available in the WholeTextFileInputFormat but is it 
available in standard or streaming text RDDs.

Thanks

Mans 

spark text processing

2014-07-03 Thread M Singh
Hi:

Is there a way to find out when spark has finished processing a text file (both 
for streaming and non-streaming cases) ?

Also, after processing, can spark copy the file to another directory ?


Thanks


Configuration properties for Spark

2014-06-30 Thread M Singh
Hi:

Is there a comprehensive properties list (with permissible/default values) for 
spark ?

Thanks

Mans

Re: jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)

2014-06-28 Thread M Singh
Hi Paul:

Here are the dependencies in spark 1.1.0-snapshot that are pulling in 
org.codehaus.jackson:jackson-core-asl 1.8 and 1.9 jar.


1.9
com.twitter:parquet-hadoop:jar:1.4.3
org.apache.avro:avro:jar:1.7.6


1.8
org.apache.spark:spark-hive_2.10:jar:1.1.0-SNAPSHOT
org.apache.hadoop:hadoop-core:jar:1.0.4
org.apache.hbase:hbase:jar:0.94.6

Thanks

Mans



On Saturday, June 28, 2014 2:22 AM, Paul Brown p...@mult.ifario.us wrote:
 




Hi, Mans --

Both of those versions of Jackson are pretty ancient.  Do you know which of the 
Spark dependencies is pulling them in?  It would be good for us (the Jackson, 
Woodstox, etc., folks) to see if we can get people to upgrade to more recent 
versions of Jackson.

-- Paul


—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Fri, Jun 27, 2014 at 12:58 PM, M Singh mans6si...@yahoo.com wrote:

Hi:


I am using spark to stream data to cassandra and it works fine in local mode. 
But when I execute the application in a standalone clustered env I got 
exception included below (java.lang.NoClassDefFoundError: 
org/codehaus/jackson/annotate/JsonClass).


I think this is due to the jackson-core-asl dependency conflict 
(jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not).  The 1.9.x 
version is being pulled in by spark-sql project.  I tried adding 
jackson-core-asl 1.8.8 with --jars argument while submitting the application 
for execution but it did not work.  So I created a custom spark build 
excluding sql project.  With this custom spark install I was able to resolve 
the issue at least on a single node cluster (separate master and worker).  

If there is an alternate way to resolve this conflicting jar issue without a 
custom build (eg: configuration to use the user defined jars in the executor 
class path first), please let me know.

Also, is there a comprehensive list of configuration properties available for 
spark ?



Thanks


Mans


Exception trace


 TaskSetManager: Loss was due to java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
at 
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524)
at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732)
at
 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157)
at 
org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468)
at
 org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)

 

jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)

2014-06-27 Thread M Singh
Hi:

I am using spark to stream data to cassandra and it works fine in local mode. 
But when I execute the application in a standalone clustered env I got 
exception included below (java.lang.NoClassDefFoundError: 
org/codehaus/jackson/annotate/JsonClass).

I think this is due to the jackson-core-asl dependency conflict 
(jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not).  The 1.9.x 
version is being pulled in by spark-sql project.  I tried adding 
jackson-core-asl 1.8.8 with --jars argument while submitting the application 
for execution but it did not work.  So I created a custom spark build excluding 
sql project.  With this custom spark install I was able to resolve the issue at 
least on a single node cluster (separate master and worker).  

If there is an alternate way to resolve this conflicting jar issue without a 
custom build (eg: configuration to use the user defined jars in the executor 
class path first), please let me know.

Also, is there a comprehensive list of configuration properties available for 
spark ?


Thanks

Mans


Exception trace


 TaskSetManager: Loss was due to java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
at 
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524)
at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732)
at
 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157)
at 
org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468)
at
 org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)