Apache Spark - How to concert DataFrame json string to structured element and using schema_of_json
Hi: In apache spark we can read json using the following: spark.read.json("path"). There is support to convert json string in a dataframe into structured element using (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.types.DataType-scala.collection.immutable.Map-) from_json(, schema). However, is there anyway to convert the row into structured element without the schema ? Also, there is support for getting schema of a json string using schema_of_json https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#schema_of_json-org.apache.spark.sql.Column- Is there a way to convert the result into StructType ? Thanks
Re: Apache Kafka / Spark Integration - Exception - The server disconnected before a response was received.
Hi Daniel: Yes I am working with Spark Structured Streaming. The exception is emanating from spark kafka connector but I was wondering if someone has encountered this issue and resolved it by some configuration parameter in kafka client/broker or OS settings. Thanks Mans On Tuesday, April 10, 2018, 7:49:42 AM PDT, Daniel Hinojosa <dhinoj...@evolutionnext.com> wrote: This looks more like a spark issue than it does a Kafka judging by the stack trace, are you using Spark structured streaming with Kafka integration by chance? On Mon, Apr 9, 2018 at 8:47 AM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Hi Folks: > Just wanted to see if anyone has any suggestions on this issue. > Thanks > > > On Monday, March 26, 2018, 11:04:02 AM PDT, M Singh > <mans2si...@yahoo.com.INVALID> wrote: > > Hi Ted: > Here is the exception trace (Note - The exception is occuring in the kafka > spark writer class). > > I will try to check broker logs. Is there anything specific I should look > for ? > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1708) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1696) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1695) > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1695) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:855) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:855) > at scala.Option.foreach(Option.scala:257) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:855) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1923) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1878) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1867) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:926) > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:924) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:151) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) > at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1. > apply$mcV$sp(KafkaWriter.scala:89) > at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1. > apply(KafkaWriter.scala:89) > at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1. > apply(KafkaWriter.scala:89) > at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId( > SQLExecution.scala:65) > at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88) > at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38) > > > On Monday, March 26, 2018, 9:34:06 AM PDT, Ted Yu <yuzhih...@gmail.com> > wrote: > > Can you post the stack trace for NetworkException (pastebin) ? > > Please also check the broker logs to see if there was some clue around the > time this happened. > > Thanks > > On Mon, Mar 26, 2018 at 9:30 AM, M Singh <mans2si...@yahoo.com.invalid> > wrote: > > > Hi: > > I am working with spark 2.2.1 and spark kafka 0.10 client integration > with > > Kafka brokers using 0.11. > > I get the exception - org.apache.kafka.common.errors.NetworkException: > > The server disconnected before a response was received - when the > > application is trying to write to a topic. This exception kills the spark > > application. > > Based on some similar issues I saw on the web I've added the following > > kafka configuration but it has not helped. > > acks = 0 > > request.timeout.ms = 45000 > > receive.buffer.bytes = 1024000 > > I've posted this question to apache spark users list but have not > received > > any response. If anyone has any suggestion/pointers, please let me know. > > Thanks > > > >
Apache Spark - Structured Streaming State Management With Watermark
Hi: I am using Apache Spark Structured Streaming (2.2.1) to implement custom sessionization for events. The processing is in two steps:1. flatMapGroupsWithState (based on user id) - which stores the state of user and emits events every minute until a expire event is received 2. The next step is a aggregation (group by count) I am using outputMode - Update. I have a few questions: 1. If I don't use watermark at all - (a) is the state for flatMapGroupsWithState state stored forever ? (b) is the state for groupBy count stored for ever ?2. Is watermark applicable for cleaning up groupBy aggregates only ?3. Can we use watermark to manage state in by flatMapGroupsWithState ? If so, how ? 4. Can watermark be used for other state clean up - are there any examples for those ? Thanks
Apache Spark - Structured Streaming StreamExecution Stats Description
Hi: I am using spark structured streaming 2.2.1 and am using flatMapGroupWithState and a groupBy count operators. In the StreamExecution logs I see two enteries for stateOperators "stateOperators" : [ { "numRowsTotal" : 1617339, "numRowsUpdated" : 9647 }, { "numRowsTotal" : 1326355, "numRowsUpdated" : 1398672 } ], My questions are:1. Is there way to figure out which stats is for flatMapGroupWithState and which one for groupBy count ? In my case, I can guess based on my data but want to be definitive about it.2. For the second stats - how can the numRowsTotal (1326355) be less than numRowsUpdated (1398672) ? If there in documentation I can use to understand the debug output, please let me know. Thanks
Apache Spark Structured Streaming - How to keep executor alive.
Hi: I am working on spark structured streaming (2.2.1) with kafka and want 100 executors to be alive. I set spark.executor.instances to be 100. The process starts running with 100 executors but after some time only a few remain which causes backlog of events from kafka. I thought I saw a setting to keep the executors from being killed. However, I am not able to find that configuration in spark docs. If anyone knows that setting, please let me know. Thanks
Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint
Hi: I am working on a realtime application using spark structured streaming (v 2.2.1). The application reads data from kafka and if there is a failure, I would like to ignore the checkpoint. Is there any configuration to just read from last kafka offset after a failure and ignore any offset checkpoints ? Also, I believe that the checkpoint also saves state and will continue to aggregations after recovery. Is there any way to ignore checkpointed state ? Also, is there a way to selectively save state or offset checkpoint only ? Thanks
Apache Spark Structured Streaming - Kafka Consumer cannot fetch records for offset exception
Hi: I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the last few days, after running the application for 30-60 minutes get exception from Kafka Consumer included below. The structured streaming application is processing 1 minute worth of data from kafka topic. So I've tried increasing request.timeout.ms from 4 seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still get the same exception. Is there any spark/kafka configuration that can save the offset and retry it next time rather than throwing an exception and killing the application. I've tried googling but have not found substantial solution/recommendation. If anyone has any suggestions or a different version etc, please let me know. Thanks Here is the exception stack trace. java.util.concurrent.TimeoutException: Cannot fetch record for offsetin 12 millisecondsat org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) at
Re: Apache Spark - Structured Streaming reading from Kafka some tasks take much longer
Hi Vijay: I am using spark-shell because I am still prototyping the steps involved. Regarding executors - I have 280 executors and UI only show a few straggler tasks on each trigger. The UI does not show too much time spend on GC. suspect the delay is because of getting data from kafka. The number of straggler is generally less than 5 out 240 but sometimes is higher. I will try to dig more into it and see if changing partitions etc helps but was wondering if anyone else has encountered similar stragglers holding up processing of a window trigger. Thanks On Friday, February 23, 2018 6:07 PM, vijay.bvpwrote: Instead of spark-shell have you tried running it as a job. how many executors and cores, can you share the RDD graph and event timeline on the UI and did you find which of the tasks taking more time was they are any GC please look at the UI if not already it can provide lot of information -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Apache Spark - Structured Streaming reading from Kafka some tasks take much longer
Hi: I am working with spark structured streaming (2.2.1) reading data from Kafka (0.11). I need to aggregate data ingested every minute and I am using spark-shell at the moment. The message rate ingestion rate is approx 500k/second. During some trigger intervals (1 minute) especially when the streaming process is started, all tasks finish in 20seconds but during some triggers, it takes 90 seconds. I have tried to reduce the number of partitions approx (100 from 300) to reduce the consumers for Kafka, but that has not helped. I also tried the kafkaConsumer.pollTimeoutMs to 30 seconds but then I see a lot of java.util.concurrent.TimeoutException: Cannot fetch record for offset. So I wanted to see if anyone has any thoughts/recommendations. Thanks
Re: Apache Spark - Structured Streaming Query Status - field descriptions
Thanks Richard. I am hoping that Spark team will at some time, provide more detailed documentation. On Sunday, February 11, 2018 2:17 AM, Richard Qiao <richardqiao2...@gmail.com> wrote: Can find a good source for documents, but the source code “org.apache.spark.sql.execution.streaming.ProgressReporter” is helpful to answer some of them. For example: inputRowsPerSecond = numRecords / inputTimeSec, processedRowsPerSecond = numRecords / processingTimeSecThis is explaining why the 2 rowPerSec difference. On Feb 10, 2018, at 8:42 PM, M Singh <mans2si...@yahoo.com.INVALID> wrote: Hi: I am working with spark 2.2.0 and am looking at the query status console output. My application reads from kafka - performs flatMapGroupsWithState and then aggregates the elements for two group counts. The output is send to console sink. I see the following output (with my questions in bold). Please me know where I can find detailed description of the query status fields for spark structured streaming ? StreamExecution: Streaming query made progress: { "id" : "8eff62a9-81a8-4142-b332-3e5ec63e06a2", "runId" : "21778fbb-406c-4c65-bdef-d9d2c24698ce", "name" : null, "timestamp" : "2018-02-11T01:18:00.005Z", "numInputRows" : 5780, "inputRowsPerSecond" : 96.32851690748795, "processedRowsPerSecond" : 583.9563548191554, // Why is the number of processedRowsPerSecond greater than inputRowsPerSecond ? Does this include shuffling/grouping ? "durationMs" : { "addBatch" : 9765, // Is the time taken to get send output to all console output streams ? "getBatch" : 3, // Is this time taken to get the batch from Kafka ? "getOffset" : 3, // Is this time for getting offset from Kafka ? "queryPlanning" : 89, // The value of this field changes with different triggers but the query is not changing so why does this change ? "triggerExecution" : 9898, // Is this total time for this trigger ? "walCommit" : 35 // Is this for checkpointing ? }, "stateOperators" : [ { // What are the two state operators ? I am assuming one is flatMapWthState (first one). "numRowsTotal" : 8, "numRowsUpdated" : 1 }, { "numRowsTotal" : 6, //Is this the group by state operator ? If so, I have two group by so why do I see only one ? "numRowsUpdated" : 6 } ], "sources" : [ { "description" : "KafkaSource[Subscribe[xyz]]", "startOffset" : { "xyz" : { "2" : 9183, "1" : 9184, "3" : 9184, "0" : 9183 } }, "endOffset" : { "xyz" : { "2" : 10628, "1" : 10629, "3" : 10629, "0" : 10628 } }, "numInputRows" : 5780, "inputRowsPerSecond" : 96.32851690748795, "processedRowsPerSecond" : 583.9563548191554 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@15fc109c" } }
Re: Apache Spark - Structured Streaming - Updating UDF state dynamically at run time
Just checking if anyone has any pointers for dynamically updating query state in structured streaming. Thanks On Thursday, February 8, 2018 2:58 PM, M Singh <mans2si...@yahoo.com.INVALID> wrote: Hi Spark Experts: I am trying to use a stateful udf with spark structured streaming that needs to update the state periodically. Here is the scenario: 1. I have a udf with a variable with default value (eg: 1) This value is applied to a column (eg: subtract the variable from the column value )2. The variable is to be updated periodically asynchronously (eg: reading a file every 5 minutes) and the new rows will have the new value applied to the column value. Spark natively supports broadcast variables, but I could not find a way to update the broadcasted variables dynamically or rebroadcast them once so that the udf internal state can be updated while the structure streaming application is running. I can try to read the variable from the file on each invocation of the udf but it will not scale since each invocation open/read/close the file. Please let me know if there is any documentation/example to support this scenario. Thanks
Apache Spark - Structured Streaming Query Status - field descriptions
Hi: I am working with spark 2.2.0 and am looking at the query status console output. My application reads from kafka - performs flatMapGroupsWithState and then aggregates the elements for two group counts. The output is send to console sink. I see the following output (with my questions in bold). Please me know where I can find detailed description of the query status fields for spark structured streaming ? StreamExecution: Streaming query made progress: { "id" : "8eff62a9-81a8-4142-b332-3e5ec63e06a2", "runId" : "21778fbb-406c-4c65-bdef-d9d2c24698ce", "name" : null, "timestamp" : "2018-02-11T01:18:00.005Z", "numInputRows" : 5780, "inputRowsPerSecond" : 96.32851690748795, "processedRowsPerSecond" : 583.9563548191554, // Why is the number of processedRowsPerSecond greater than inputRowsPerSecond ? Does this include shuffling/grouping ? "durationMs" : { "addBatch" : 9765, // Is the time taken to get send output to all console output streams ? "getBatch" : 3, // Is this time taken to get the batch from Kafka ? "getOffset" : 3, // Is this time for getting offset from Kafka ? "queryPlanning" : 89, // The value of this field changes with different triggers but the query is not changing so why does this change ? "triggerExecution" : 9898, // Is this total time for this trigger ? "walCommit" : 35 // Is this for checkpointing ? }, "stateOperators" : [ { // What are the two state operators ? I am assuming one is flatMapWthState (first one). "numRowsTotal" : 8, "numRowsUpdated" : 1 }, { "numRowsTotal" : 6, //Is this the group by state operator ? If so, I have two group by so why do I see only one ? "numRowsUpdated" : 6 } ], "sources" : [ { "description" : "KafkaSource[Subscribe[xyz]]", "startOffset" : { "xyz" : { "2" : 9183, "1" : 9184, "3" : 9184, "0" : 9183 } }, "endOffset" : { "xyz" : { "2" : 10628, "1" : 10629, "3" : 10629, "0" : 10628 } }, "numInputRows" : 5780, "inputRowsPerSecond" : 96.32851690748795, "processedRowsPerSecond" : 583.9563548191554 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@15fc109c" } }
Apache Spark - Structured Streaming - Updating UDF state dynamically at run time
Hi Spark Experts: I am trying to use a stateful udf with spark structured streaming that needs to update the state periodically. Here is the scenario: 1. I have a udf with a variable with default value (eg: 1) This value is applied to a column (eg: subtract the variable from the column value )2. The variable is to be updated periodically asynchronously (eg: reading a file every 5 minutes) and the new rows will have the new value applied to the column value. Spark natively supports broadcast variables, but I could not find a way to update the broadcasted variables dynamically or rebroadcast them once so that the udf internal state can be updated while the structure streaming application is running. I can try to read the variable from the file on each invocation of the udf but it will not scale since each invocation open/read/close the file. Please let me know if there is any documentation/example to support this scenario. Thanks
Re: Apache Spark - Spark Structured Streaming - Watermark usage
Hi Jacek: Thanks for your response. I am just trying to understand the fundamentals of watermarking and how it behaves in aggregation vs non-aggregation scenarios. On Tuesday, February 6, 2018 9:04 AM, Jacek Laskowski <ja...@japila.pl> wrote: Hi, What would you expect? The data is simply dropped as that's the purpose of watermarking it. That's my understanding at least. Pozdrawiam,Jacek Laskowskihttps://about.me/JacekLaskowskiMastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Mon, Feb 5, 2018 at 8:11 PM, M Singh <mans2si...@yahoo.com> wrote: Just checking if anyone has more details on how watermark works in cases where event time is earlier than processing time stamp. On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote: Hi Vishu/Jacek: Thanks for your responses. Jacek - At the moment, the current time for my use case is processing time. Vishnu - Spark documentation (https://spark.apache.org/ docs/latest/structured- streaming-programming-guide. html) does indicate that it can dedup using watermark. So I believe there are more use cases for watermark and that is what I am trying to find. I am hoping that TD can clarify or point me to the documentation. Thanks On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath <vishnu.viswanat...@gmail.com> wrote: Hi Mans, Watermark is Spark is used to decide when to clear the state, so if the even it delayed more than when the state is cleared by Spark, then it will be ignored.I recently wrote a blog post on this : http://vishnuviswanath.com/ spark_structured_streaming. html#watermark Yes, this State is applicable for aggregation only. If you are having only a map function and don't want to process it, you could do a filter based on its EventTime field, but I guess you will have to compare it with the processing time since there is no API to access Watermark by the user. -Vishnu On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi: I am trying to filter out records which are lagging behind (based on event time) by a certain amount of time. Is the watermark api applicable to this scenario (ie, filtering lagging records) or it is only applicable with aggregation ? I could not get a clear understanding from the documentation which only refers to it's usage with aggregation. Thanks Mans
Re: Apache Spark - Spark Structured Streaming - Watermark usage
Just checking if anyone has more details on how watermark works in cases where event time is earlier than processing time stamp. On Friday, February 2, 2018 8:47 AM, M Singh <mans2si...@yahoo.com> wrote: Hi Vishu/Jacek: Thanks for your responses. Jacek - At the moment, the current time for my use case is processing time. Vishnu - Spark documentation (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) does indicate that it can dedup using watermark. So I believe there are more use cases for watermark and that is what I am trying to find. I am hoping that TD can clarify or point me to the documentation. Thanks On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath <vishnu.viswanat...@gmail.com> wrote: Hi Mans, Watermark is Spark is used to decide when to clear the state, so if the even it delayed more than when the state is cleared by Spark, then it will be ignored.I recently wrote a blog post on this : http://vishnuviswanath.com/spark_structured_streaming.html#watermark Yes, this State is applicable for aggregation only. If you are having only a map function and don't want to process it, you could do a filter based on its EventTime field, but I guess you will have to compare it with the processing time since there is no API to access Watermark by the user. -Vishnu On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi: I am trying to filter out records which are lagging behind (based on event time) by a certain amount of time. Is the watermark api applicable to this scenario (ie, filtering lagging records) or it is only applicable with aggregation ? I could not get a clear understanding from the documentation which only refers to it's usage with aggregation. Thanks Mans
Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame
Hi TD: Just wondering if you have any insight for me or need more info. Thanks On Thursday, February 1, 2018 7:43 AM, M Singh <mans2si...@yahoo.com.INVALID> wrote: Hi TD: Here is the udpated code with explain and full stack trace. Please let me know what could be the issue and what to look for in the explain output. Updated code: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select("*") dataframe2 = dataframe2.withColumn("cts", current_timestamp().cast("long")) dataframe2.explain(true) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination() }} Explain output: == Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- AnalysisBarrier +- Project [id#0, visit#1] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, visit#1] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- StreamingRelation FileSource[./data/], [id#0, visit#1] Here is the exception: 18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.st
Re: Apache Spark - Spark Structured Streaming - Watermark usage
Hi Vishu/Jacek: Thanks for your responses. Jacek - At the moment, the current time for my use case is processing time. Vishnu - Spark documentation (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) does indicate that it can dedup using watermark. So I believe there are more use cases for watermark and that is what I am trying to find. I am hoping that TD can clarify or point me to the documentation. Thanks On Wednesday, January 31, 2018 6:37 AM, Vishnu Viswanath <vishnu.viswanat...@gmail.com> wrote: Hi Mans, Watermark is Spark is used to decide when to clear the state, so if the even it delayed more than when the state is cleared by Spark, then it will be ignored.I recently wrote a blog post on this : http://vishnuviswanath.com/spark_structured_streaming.html#watermark Yes, this State is applicable for aggregation only. If you are having only a map function and don't want to process it, you could do a filter based on its EventTime field, but I guess you will have to compare it with the processing time since there is no API to access Watermark by the user. -Vishnu On Fri, Jan 26, 2018 at 1:14 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi: I am trying to filter out records which are lagging behind (based on event time) by a certain amount of time. Is the watermark api applicable to this scenario (ie, filtering lagging records) or it is only applicable with aggregation ? I could not get a clear understanding from the documentation which only refers to it's usage with aggregation. Thanks Mans
Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame
n$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to dataType on unresolved object, tree: 'cts=== Streaming Query ===Identifier: [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = b5c618cb-30c7-4eff-8f09-ea1d064878ae]Current Committed Offsets: {}Current Available Offsets: {FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data]: {"logOffset":0}} Current State: ACTIVEThread State: RUNNABLE Logical Plan:Project [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, visit#1] +- StreamingExecutionRelation FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data], [id#0, visit#1] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more On Wednesday, January 31, 2018 3:46 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output? On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi Folks: I have to add a column to a structured streaming dataframe but when I do that (using select or withColumn) I get an exception. I can add a column in structured non-streaming structured dataframe. I could not find any documentation on how to do this in the following doc [https://spark.apache.org/ docs/latest/structured- streaming-programming-guide. html] I am using spark 2.4.0-SNAPSHOT Please let me know what I could be missing. Thanks for your help. (I am also attaching the source code for the structured streaming, structured non-str
Apache Spark - Exception on adding column to Structured Streaming DataFrame
Hi Folks: I have to add a column to a structured streaming dataframe but when I do that (using select or withColumn) I get an exception. I can add a column in structured non-streaming structured dataframe. I could not find any documentation on how to do this in the following doc [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html] I am using spark 2.4.0-SNAPSHOT Please let me know what I could be missing. Thanks for your help. (I am also attaching the source code for the structured streaming, structured non-streaming classes and input file with this email) org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) Here is the input file (in the ./data directory) - note tokens are separated by '\t' 1 v12 v12 v23 v33 v1 Here is the code with dataframe (non-streaming) which works: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.apache.spark.sql._import org.apache.spark.sql.types._ object StructuredTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframe = spark.read.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframe.select(expr("*"), current_timestamp().as("cts")) dataframe2.show(false) spark.stop() }} Output of the above code is: +---+-+---+|id |visit|cts |+---+-+---+|1 |v1 |2018-01-31 15:07:00.758||2 |v1 |2018-01-31 15:07:00.758||2 |v2 |2018-01-31 15:07:00.758||3 |v3 |2018-01-31 15:07:00.758||3 |v1 |2018-01-31 15:07:00.758|+---+-+---+ Here is the code with structured streaming which throws the exception: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select("*") dataframe2 = dataframe2.withColumn("cts", current_timestamp()) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination() }} Here is the exception: 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) I've also used snippets (shown in bold below) from (https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but still get the same exception: Here is the code: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select( current_timestamp().cast("timestamp").alias("timestamp"), expr("*")) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination()
Apache Spark - Spark Structured Streaming - Watermark usage
Hi: I am trying to filter out records which are lagging behind (based on event time) by a certain amount of time. Is the watermark api applicable to this scenario (ie, filtering lagging records) or it is only applicable with aggregation ? I could not get a clear understanding from the documentation which only refers to it's usage with aggregation. Thanks Mans
Re: Apache Spark - Custom structured streaming data source
Thanks TD. When will 2.3 scheduled for release ? On Thursday, January 25, 2018 11:32 PM, Tathagata Das <t...@databricks.com> wrote: Hello Mans, The streaming DataSource APIs are still evolving and are not public yet. Hence there is no official documentation. In fact, there is a new DataSourceV2 API (in Spark 2.3) that we are migrating towards. So at this point of time, it's hard to make any concrete suggestion. You can take a look at the classes DataSourceV2, DataReader, MicroBatchDataReader in the spark source code, along with their implementations. Hope this helps. TD On Jan 25, 2018 8:36 PM, "M Singh" <mans2si...@yahoo.com.invalid> wrote: Hi: I am trying to create a custom structured streaming source and would like to know if there is any example or documentation on the steps involved. I've looked at the some methods available in the SparkSession but these are internal to the sql package: private[sql] def internalCreateDataFrame( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame = { // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD( schema.toAttributes, catalystRows, isStreaming = isStreaming)(self) Dataset.ofRows(self, logicalPlan) } Please let me know where I can find the appropriate API or documentation. Thanks Mans
Apache Spark - Custom structured streaming data source
Hi: I am trying to create a custom structured streaming source and would like to know if there is any example or documentation on the steps involved. I've looked at the some methods available in the SparkSession but these are internal to the sql package: private[sql] def internalCreateDataFrame( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame = { // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD( schema.toAttributes, catalystRows, isStreaming = isStreaming)(self) Dataset.ofRows(self, logicalPlan) } Please let me know where I can find the appropriate API or documentation. Thanks Mans
Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size
Hi Jacek: The javadoc mentions that we can only consume data from the data frame in the addBatch method. So, if I would like to save the data to a new sink then I believe that I will need to collect the data and then save it. This is the reason I am asking about how to control the size of the data in each invocation of the addBatch method. Let me know if I am interpreting the javadoc incorrectly. Here it is: /** * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * this method is called more than once with the same batchId (which will happen in the case of * failures), then `data` should only be added once. * * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`). * Otherwise, you may get a wrong result. * * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return * after data is consumed by sink successfully. */ def addBatch(batchId: Long, data: DataFrame): Unit Thanks Mans On Thursday, January 4, 2018 2:19 PM, Jacek Laskowski <ja...@japila.pl> wrote: Hi, > If the data is very large then a collect may result in OOM. That's a general case even in any part of Spark, incl. Spark Structured Streaming. Why would you collect in addBatch? It's on the driver side and as anything on the driver, it's a single JVM (and usually not fault tolerant) > Do you have any other suggestion/recommendation ? What's wrong with the current solution? I don't think you should change how you do things currently. You should just avoid collect on large datasets (which you have to do anywhere in Spark). Pozdrawiam,Jacek Laskowskihttps://about.me/JacekLaskowskiMastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Thanks Tathagata for your answer. The reason I was asking about controlling data size is that the javadoc indicate you can use foreach or collect on the dataframe. If the data is very large then a collect may result in OOM. >From your answer it appears that the only way to control the size (in 2.2) >would be control the trigger interval. However, in my case, I have to dedup >the elements in one minute interval, which I am using a trigger interval and >cannot reduce it. Do you have any other suggestion/recommendation ? Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ? Thanks again. On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: 1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations. 2. If you want to limit the amount of data that is processed in a trigger, then you should either control the trigger interval or use the rate limit options on sources that support it (e.g. for kafka, you can use the option "maxOffsetsPerTrigger", see the guide). Related note, these APIs are subject to change. In fact in the upcoming release 2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/ continuous-streaming sources and sinks. On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi: The documentation for Sink.addBatch is as follows: /** * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * this method is called more than once with the same batchId (which will happen in the case of * failures), then `data` should only be added once. * * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`). * Otherwise, you may get a wrong result. * * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return * after data is consumed by sink successfully. */ def addBatch(batchId: Long, data: DataFrame): Unit A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ? 2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ? Thanks
Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size
Thanks Tathagata for your answer. The reason I was asking about controlling data size is that the javadoc indicate you can use foreach or collect on the dataframe. If the data is very large then a collect may result in OOM. >From your answer it appears that the only way to control the size (in 2.2) >would be control the trigger interval. However, in my case, I have to dedup >the elements in one minute interval, which I am using a trigger interval and >cannot reduce it. Do you have any other suggestion/recommendation ? Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ? Thanks again. On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: 1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations. 2. If you want to limit the amount of data that is processed in a trigger, then you should either control the trigger interval or use the rate limit options on sources that support it (e.g. for kafka, you can use the option "maxOffsetsPerTrigger", see the guide). Related note, these APIs are subject to change. In fact in the upcoming release 2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/continuous-streaming sources and sinks. On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi: The documentation for Sink.addBatch is as follows: /** * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * this method is called more than once with the same batchId (which will happen in the case of * failures), then `data` should only be added once. * * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`). * Otherwise, you may get a wrong result. * * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return * after data is consumed by sink successfully. */ def addBatch(batchId: Long, data: DataFrame): Unit A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ? 2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ? Thanks
Apache Spark - Question about Structured Streaming Sink addBatch dataframe size
Hi: The documentation for Sink.addBatch is as follows: /** * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * this method is called more than once with the same batchId (which will happen in the case of * failures), then `data` should only be added once. * * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`). * Otherwise, you may get a wrong result. * * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return * after data is consumed by sink successfully. */ def addBatch(batchId: Long, data: DataFrame): Unit A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ? 2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ? Thanks
Re: Spark on EMR suddenly stalling
Hi Jeroen: I am not sure if I missed it - but can you let us know what is your input source and output sink ? In some cases, I found that saving to S3 was a problem. In this case I started saving the output to the EMR HDFS and later copied to S3 using s3-dist-cp which solved our issue. Mans On Monday, January 1, 2018 7:41 AM, Rohit Karlupiawrote: Here is the list that I will probably try to fill: - Check GC on the offending executor when the task is running. May be you need even more memory. - Go back to some previous successful run of the job and check the spark ui for the offending stage and check max task time/max input/max shuffle in/out for the largest task. Will help you understand the degree of skew in this stage. - Take a thread dump of the executor from the Spark UI and verify if the task is really doing any work or it stuck in some deadlock. Some of the hive serde are not really usable from multi-threaded/multi-use spark executors. - Take a thread dump of the executor from the Spark UI and verify if the task is spilling to disk. Playing with storage and memory fraction or generally increasing the memory will help. - Check the disk utilisation on the machine running the executor. - Look for event loss messages in the logs due to event queue full. Loss of events can send some of the spark components into really bad states. thanks,rohitk On Sun, Dec 31, 2017 at 12:50 AM, Gourav Sengupta wrote: Hi, Please try to use the SPARK UI from the way that AWS EMR recommends, it should be available from the resource manager. I never ever had any problem working with it. THAT HAS ALWAYS BEEN MY PRIMARY AND SOLE SOURCE OF DEBUGGING. Sadly, I cannot be of much help unless we go for a screen share session over google chat or skype. Also, I ALWAYS prefer the maximize Resource Allocation setting in EMR to be set to true. Besides that, there is a metrics in the EMR console which shows the number of containers getting generated by your job on graphs. Regards,Gourav Sengupta On Fri, Dec 29, 2017 at 6:23 PM, Jeroen Miller wrote: Hello, Just a quick update as I did not made much progress yet. On 28 Dec 2017, at 21:09, Gourav Sengupta wrote: > can you try to then use the EMR version 5.10 instead or EMR version 5.11 > instead? Same issue with EMR 5.11.0. Task 0 in one stage never finishes. > can you please try selecting a subnet which is in a different availability > zone? I did not try this yet. But why should that make a difference? > if possible just try to increase the number of task instances and see the > difference? I tried with 512 partitions -- no difference. > also in case you are using caching, No caching used. > Also can you please report the number of containers that your job is creating > by looking at the metrics in the EMR console? 8 containers if I trust the directories in j-xxx/containers/application_x xx/. > Also if you see the spark UI then you can easily see which particular step is > taking the longest period of time - you just have to drill in a bit in order > to see that. Generally in case shuffling is an issue then it definitely > appears in the SPARK UI as I drill into the steps and see which particular > one is taking the longest. I always have issues with the Spark UI on EC2 -- it never seems to be up to date. JM
Apache Spark - Using withWatermark for DataSets
Hi: I am working with DataSets so that I can use mapGroupsWithState for business logic and then use dropDuplicates over a set of fields. I would like to use the withWatermark so that I can restrict the how much state is stored. >From the API it looks like withWatermark takes a string - timestamp column >name as argument. Is it possible to use it with DataSets ? If not, is there >any alternative like withWatermark available for working with DataSets ? Thanks Mans
Re: Apache Spark - Structured Streaming graceful shutdown
Thanks Eyal - it appears that these are the same patterns used for spark DStreams. On Wednesday, December 27, 2017 1:15 AM, Eyal Zituny <eyal.zit...@equalum.io> wrote: Hiif you're interested in stopping you're spark application externally, you will probably need a way to communicate with the spark driver (which start and holds a ref to the spark context)this can be done by adding some code to the driver app, for example: - you can expose a rest api that stop the query and the spark context - if running in client mode you can listen to stdin - you can also listen to an external system (like kafka) Eyal On Tue, Dec 26, 2017 at 10:37 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Thanks Diogo. My question is how to gracefully call the stop method while the streaming application is running in a cluster. On Monday, December 25, 2017 5:39 PM, Diogo Munaro Vieira <diogo.mun...@corp.globo.com> wrote: Hi M Singh! Here I'm using query.stop() Em 25 de dez de 2017 19:19, "M Singh" <mans2si...@yahoo.com.invalid> escreveu: Hi:Are there any patterns/recommendations for gracefully stopping a structured streaming application ?Thanks
Re: Apache Spark - Structured Streaming graceful shutdown
Thanks Diogo. My question is how to gracefully call the stop method while the streaming application is running in a cluster. On Monday, December 25, 2017 5:39 PM, Diogo Munaro Vieira <diogo.mun...@corp.globo.com> wrote: Hi M Singh! Here I'm using query.stop() Em 25 de dez de 2017 19:19, "M Singh" <mans2si...@yahoo.com.invalid> escreveu: Hi:Are there any patterns/recommendations for gracefully stopping a structured streaming application ?Thanks
Apache Spark - (2.2.0) - window function for DataSet
Hi:I would like to use window function on a DataSet stream (Spark 2.2.0)The window function requires Column as argument and can be used with DataFrames by passing the column. Is there any analogous window function or pointers to how window function can be used for DataSets ? Thanks
Apache Spark - Structured Streaming from file - checkpointing
Hi: I am using spark structured streaming (v 2.2.0) to read data from files. I have configured checkpoint location. On stopping and restarting the application, it looks like it is reading the previously ingested files. Is that expected behavior ? Is there anyway to prevent reading files that have already been ingested ? If a file is partially ingested, on restart - can we start reading the file from previously checkpointed offset ? Thanks
Apache Spark - Structured Streaming graceful shutdown
Hi:Are there any patterns/recommendations for gracefully stopping a structured streaming application ?Thanks
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
Thanks TD. BTW - If I have input file ~ 250 GBs - Is there any guideline on whether to use: * a single input (250 GB) (in this case is there any max upper bound) or * split into 1000 files each of 250 MB (hdfs block size is 250 MB) or * a multiple of hdfs block size. Mans On Friday, July 11, 2014 4:38 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The model for file stream is to pick up and process new files written atomically (by move) into a directory. So your file is being processed in a single batch, and then its waiting for any new files to be written into that directory. TD On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote: So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Spark streaming - tasks and stages continue to be generated when using reduce by key
Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even afterprocessing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: Reading text file vs streaming text files
Hi Akhil: Thanks for your response. Mans On Thursday, July 3, 2014 9:16 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Singh! For this use-case its better to have a Streaming context listening to that directory in hdfs where the files are being dropped and you can set the Streaming interval as 15 minutes and let this driver program run continuously, so as soon as new files are arrived they are taken for processing in every 15 minutes. In this way, you don't have to worry about the old files unless you are about to restart the driver program. Another implementation would be after processing of each batch, you can simply move those processed files to another directory or so. Thanks Best Regards On Thu, Jul 3, 2014 at 6:34 PM, M Singh mans6si...@yahoo.com wrote: Hi: I am working on a project where a few thousand text files (~20M in size) will be dropped in an hdfs directory every 15 minutes. Data from the file will used to update counters in cassandra (non-idempotent operation). I was wondering what is the best to deal with this: * Use text streaming and process the files as they are added to the directory * Use non-streaming text input and launch a spark driver every 15 minutes to process files from a specified directory (new directory for every 15 minutes). * Use message queue to ingest data from the files and then read data from the queue. Also, is there a way to to find which text file is being processed and when a file has been processed for both the streaming and non-streaming RDDs. I believe filename is available in the WholeTextFileInputFormat but is it available in standard or streaming text RDDs. Thanks Mans
Re: Java sample for using cassandra-driver-spark
Hi Piotr: It would be great if we can have an api to support batch updates (counter + non-counter). Thanks Mans On Monday, July 7, 2014 11:36 AM, Piotr Kołaczkowski pkola...@datastax.com wrote: Hi, we're planning to add a basic Java-API very soon, possibly this week. There's a ticket for it here: https://github.com/datastax/cassandra-driver-spark/issues/11 We're open to any ideas. Just let us know what you need the API to have in the comments. Regards, Piotr Kołaczkowski 2014-07-05 0:48 GMT+02:00 M Singh mans6si...@yahoo.com: Hi: Is there a Java sample fragment for using cassandra-driver-spark ? Thanks -- Piotr Kolaczkowski, Lead Software Engineer pkola...@datastax.com http://www.datastax.com/3975 Freedom Circle Santa Clara, CA 95054, USA
Re: window analysis with Spark and Spark streaming
Another alternative could be use SparkStreaming's textFileStream with windowing capabilities. On Friday, July 4, 2014 9:52 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: You should think about a custom receiver, in order to solve the problem of the “already collected” data. http://spark.apache.org/docs/latest/streaming-custom-receivers.html Gianluca On 04 Jul 2014, at 15:46, alessandro finamore alessandro.finam...@polito.it wrote: Hi, I have a large dataset of text logs files on which I need to implement window analysis Say, extract per-minute data and do aggregated stats on the last X minutes I've to implement the windowing analysis with spark. This is the workflow I'm currently using - read a file and I create a new RDD with per-minute info - loop on each new minute and integrate its data with another data structure containing the last X minutes of data - apply the analysis on the updated window of data This works but it suffer from limited parallelisms Do you have any recommendations/suggestion about a better implementation? Also, are there any recommended data collections for managing the window (I'm simply using Arrays for managing data) While working in this I started to investigate spark streaming. The problem is that I don't know if is really possible to use it on already collected data. This post seems to indicate that it should, but it is not clear to me how http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-windowing-Driven-by-absolutely-time-td1733.html Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: window analysis with Spark and Spark streaming
The windowing capabilities of spark streaming determine the events in the RDD created for that time window. If the duration is 1s then all the events received in a particular 1s window will be a part of the RDD created for that window for that stream. On Friday, July 4, 2014 1:28 PM, alessandro finamore alessandro.finam...@polito.it wrote: Thanks for the replies What is not completely clear to me is how time is managed. I can create a DStream from file. But if I set the window property that will be bounded to the application time, right? If I got it right, with a receiver I can control the way DStream are created. But, how can apply then the windowing already shipped with the framework if this is bounded to the application time? I would like to do define a window of N files but the window() function requires a duration as input... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Java sample for using cassandra-driver-spark
Hi: Is there a Java sample fragment for using cassandra-driver-spark ? Thanks
Reading text file vs streaming text files
Hi: I am working on a project where a few thousand text files (~20M in size) will be dropped in an hdfs directory every 15 minutes. Data from the file will used to update counters in cassandra (non-idempotent operation). I was wondering what is the best to deal with this: * Use text streaming and process the files as they are added to the directory * Use non-streaming text input and launch a spark driver every 15 minutes to process files from a specified directory (new directory for every 15 minutes). * Use message queue to ingest data from the files and then read data from the queue. Also, is there a way to to find which text file is being processed and when a file has been processed for both the streaming and non-streaming RDDs. I believe filename is available in the WholeTextFileInputFormat but is it available in standard or streaming text RDDs. Thanks Mans
spark text processing
Hi: Is there a way to find out when spark has finished processing a text file (both for streaming and non-streaming cases) ? Also, after processing, can spark copy the file to another directory ? Thanks
Configuration properties for Spark
Hi: Is there a comprehensive properties list (with permissible/default values) for spark ? Thanks Mans
Re: jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)
Hi Paul: Here are the dependencies in spark 1.1.0-snapshot that are pulling in org.codehaus.jackson:jackson-core-asl 1.8 and 1.9 jar. 1.9 com.twitter:parquet-hadoop:jar:1.4.3 org.apache.avro:avro:jar:1.7.6 1.8 org.apache.spark:spark-hive_2.10:jar:1.1.0-SNAPSHOT org.apache.hadoop:hadoop-core:jar:1.0.4 org.apache.hbase:hbase:jar:0.94.6 Thanks Mans On Saturday, June 28, 2014 2:22 AM, Paul Brown p...@mult.ifario.us wrote: Hi, Mans -- Both of those versions of Jackson are pretty ancient. Do you know which of the Spark dependencies is pulling them in? It would be good for us (the Jackson, Woodstox, etc., folks) to see if we can get people to upgrade to more recent versions of Jackson. -- Paul — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Fri, Jun 27, 2014 at 12:58 PM, M Singh mans6si...@yahoo.com wrote: Hi: I am using spark to stream data to cassandra and it works fine in local mode. But when I execute the application in a standalone clustered env I got exception included below (java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass). I think this is due to the jackson-core-asl dependency conflict (jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not). The 1.9.x version is being pulled in by spark-sql project. I tried adding jackson-core-asl 1.8.8 with --jars argument while submitting the application for execution but it did not work. So I created a custom spark build excluding sql project. With this custom spark install I was able to resolve the issue at least on a single node cluster (separate master and worker). If there is an alternate way to resolve this conflicting jar issue without a custom build (eg: configuration to use the user defined jars in the executor class path first), please let me know. Also, is there a comprehensive list of configuration properties available for spark ? Thanks Mans Exception trace TaskSetManager: Loss was due to java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157) at org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)
jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)
Hi: I am using spark to stream data to cassandra and it works fine in local mode. But when I execute the application in a standalone clustered env I got exception included below (java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass). I think this is due to the jackson-core-asl dependency conflict (jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not). The 1.9.x version is being pulled in by spark-sql project. I tried adding jackson-core-asl 1.8.8 with --jars argument while submitting the application for execution but it did not work. So I created a custom spark build excluding sql project. With this custom spark install I was able to resolve the issue at least on a single node cluster (separate master and worker). If there is an alternate way to resolve this conflicting jar issue without a custom build (eg: configuration to use the user defined jars in the executor class path first), please let me know. Also, is there a comprehensive list of configuration properties available for spark ? Thanks Mans Exception trace TaskSetManager: Loss was due to java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157) at org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)