Fw: how to reset streaming state regularly

2019-02-26 Thread shicheng31...@gmail.com

Hi all:
In Spark Streaming, I want to count some metrics by day, but in method  
"mapWithState", there is no API for this. Of course, I can achieve this by 
adding some time information to the record. However, I still want to use the 
spark API implementation . So, is there any direct or indirect   API for this 
in spark? Or is there any better solution for this? 
  Thanks!




shicheng31...@gmail.com


to_avro and from_avro not working with struct type in spark 2.4

2019-02-26 Thread Hien Luu
Hi,

I ran into a pretty weird issue with to_avro and from_avro where it was not
able to parse the data in a struct correctly.  Please see the simple and
self contained example below. I am using Spark 2.4.  I am not sure if I
missed something.

This is how I start the spark-shell on my Mac:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0

import org.apache.spark.sql.types._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.functions._


spark.version

val df = Seq((1, "John Doe",  30), (2, "Mary Jane", 25)).toDF("id", "name",
"age")

val dfStruct = df.withColumn("value", struct("name","age"))

dfStruct.show
dfStruct.printSchema

val dfKV = dfStruct.select(to_avro('id).as("key"),
to_avro('value).as("value"))

val expectedSchema = StructType(Seq(StructField("name", StringType,
false),StructField("age", IntegerType, false)))

val avroTypeStruct = SchemaConverters.toAvroType(expectedSchema).toString

val avroTypeStr = s"""
  |{
  |  "type": "int",
  |  "name": "key"
  |}
""".stripMargin


dfKV.select(from_avro('key, avroTypeStr)).show

// output
+---+
|from_avro(key, int)|
+---+
|  1|
|  2|
+---+

dfKV.select(from_avro('value, avroTypeStruct)).show

// output
+-+
|from_avro(value, struct)|
+-+
|[, 9]|
|[, 9]|
+-+

Please help and thanks in advance.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Proeblem to manage offset Kafka and starts from the beginning.

2019-02-26 Thread Akshay Bhardwaj
Hi Guillermo,

What was the interval in between restarting the spark job? As a feature in
Kafka, a broker deleted offsets for a consumer group after inactivity of 24
hours.
In such a case, the newly started spark streaming job will read offsets
from beginning for the same groupId.

Akshay Bhardwaj
+91-97111-33849


On Thu, Feb 21, 2019 at 9:08 PM Gabor Somogyi 
wrote:

> From the info you've provided not much to say.
> Maybe you could collect sample app, logs etc, open a jira and we can take
> a deeper look at it...
>
> BR,
> G
>
>
> On Thu, Feb 21, 2019 at 4:14 PM Guillermo Ortiz 
> wrote:
>
>> I' working with Spark Streaming 2.0.2 and Kafka 1.0.0 using Direct Stream
>> as connector. I consume data from Kafka and autosave the offsets.
>> I can see Spark doing commits in the logs of the last offsets processed,
>> Sometimes I have restarted spark and it starts from the beginning, when I'm
>> using the same groupId.
>>
>> Why could it happen? it only happen rarely.
>>
>


Re: Spark 2.3 | Structured Streaming | Metric for numInputRows

2019-02-26 Thread Akshay Bhardwaj
If it helps, below is the same query progress report that I am able to
fetch from streaming query

{
  "id" : "f2cb24d4-622e-4355-b315-8e440f01a90c",
  "runId" : "6f3834ff-10a9-4f57-ae71-8a434ee519ce",
  "name" : "query_name_1",
  "timestamp" : "2019-02-27T06:06:58.500Z",
  "batchId" : 3725,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"addBatch" : 275,
"getBatch" : 3,
"getOffset" : 8,
"queryPlanning" : 79,
"triggerExecution" : 409,
"walCommit" : 43
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[SubscribePattern[kafka_events_topic]]",
"startOffset" : {
  "kafka_events_topic" : {
"2" : 32822078,
"1" : 114248484,
"0" : 114242134
  }
},
"endOffset" : {
  "kafka_events_topic" : {
"2" : 32822496,
"1" : 114248896,
"0" : 114242552
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "ForeachSink"
  }
}



Akshay Bhardwaj
+91-97111-33849


On Wed, Feb 27, 2019 at 11:36 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi Experts,
>
> I have a structured streaming query running on spark 2.3 over yarn
> cluster, with below features:
>
>- Reading JSON messages from Kafka topic with:
>   - maxOffsetsPerTrigger as 5000
>   - trigger interval of my writeStream task is 500ms.
>   - streaming dataset is defined as events with  fields: id, name,
>   refid, createdTime
>- A cached dataset of CSV file read from HDFS, such that, the CSV file
>contains a list of prohibited events refid
>- I have defined an intermediate dataset with the following query,
>which filters out prohibited events from the streaming data
>   - select * from events where event.refid NOT IN (select refid from
>   CSVData)
>
>
> The query progress from StreamingQuery object, it shows metrics as
> numInputRows, inputRowsPerSecond and processedRowsPerSecond as 0, although
> my query is executing with an execution time of ~400ms. And I can see that
> the query does take records from kafka and writes the processed data to the
> output database.
>
> If I remove the event filtering tasks, then all the metrics are displayed
> properly.
>
> Can anyone please point out why this behaviour is observed and how to
> gather metrics like numInputRows, etc while also filtering events fetched
> from CSV file?
> I am also open to suggestions if there is a better way of filtering out
> the prohibited events in structured streaming.
>
> Thanks in advance.
>
> Akshay Bhardwaj
> +91-97111-33849
>


Spark 2.3 | Structured Streaming | Metric for numInputRows

2019-02-26 Thread Akshay Bhardwaj
Hi Experts,

I have a structured streaming query running on spark 2.3 over yarn cluster,
with below features:

   - Reading JSON messages from Kafka topic with:
  - maxOffsetsPerTrigger as 5000
  - trigger interval of my writeStream task is 500ms.
  - streaming dataset is defined as events with  fields: id, name,
  refid, createdTime
   - A cached dataset of CSV file read from HDFS, such that, the CSV file
   contains a list of prohibited events refid
   - I have defined an intermediate dataset with the following query, which
   filters out prohibited events from the streaming data
  - select * from events where event.refid NOT IN (select refid from
  CSVData)


The query progress from StreamingQuery object, it shows metrics as
numInputRows, inputRowsPerSecond and processedRowsPerSecond as 0, although
my query is executing with an execution time of ~400ms. And I can see that
the query does take records from kafka and writes the processed data to the
output database.

If I remove the event filtering tasks, then all the metrics are displayed
properly.

Can anyone please point out why this behaviour is observed and how to
gather metrics like numInputRows, etc while also filtering events fetched
from CSV file?
I am also open to suggestions if there is a better way of filtering out the
prohibited events in structured streaming.

Thanks in advance.

Akshay Bhardwaj
+91-97111-33849


Re: Occasional broadcast timeout when dynamic allocation is on

2019-02-26 Thread Abdeali Kothari
I've been facing this issue for the past few months too.
I always thought it was an infrastructure issue, but we were never able to
figure out what the infra issue was.

If others are facing this issue too - then maybe it's a valid bug.

Does anyone have any ideas on how we can debug this?

On Fri, Feb 22, 2019, 16:10 Artem P  Hi!
>
> We have dynamic allocation enabled for our regular jobs and sometimes they
> fail with java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]. Seems like spark driver starts broadcast just before the job
> has received any executors from the YARN  and if it takes more than 5
> minutes to acquire them, the broadcast fails with TimeoutException. Is
> there any way to force Spark start broadcast only after all executors are
> in place (at least minExecutors count)?
>
> Relevant logs (it can be seen, that exception happened 5 minutes after
> broadcast start message):
> ```
>
> 2019-02-22 06:11:48,047 [broadcast-exchange-0] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 361.70265 ms
> 2019-02-22 06:11:48,237 [broadcast-exchange-0] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_0 stored as 
> values in memory (estimated size 485.3 KB, free 365.8 MB)
> 2019-02-22 06:11:48,297 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 611.624073 ms
> 2019-02-22 06:11:48,522 [broadcast-exchange-0] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_0_piece0 
> stored as bytes in memory (estimated size 63.5 KB, free 365.8 MB)
> 2019-02-22 06:11:48,524 [dispatcher-event-loop-4] INFO  
> org.apache.spark.storage.BlockManagerInfo  - Added broadcast_0_piece0 in 
> memory on ip-10-4-3-123.eu-central-1.compute.internal:42935 (size: 63.5 KB, 
> free: 366.2 MB)
> 2019-02-22 06:11:48,531 [broadcast-exchange-0] INFO  
> org.apache.spark.SparkContext  - Created broadcast 0 from run at 
> ThreadPoolExecutor.java:1149
> 2019-02-22 06:11:48,545 [broadcast-exchange-0] INFO  
> org.apache.spark.sql.execution.FileSourceScanExec  - Planning scan with bin 
> packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 
> bytes.
> 2019-02-22 06:11:48,859 [broadcast-exchange-0] INFO  
> org.apache.spark.SparkContext  - Starting job: run at 
> ThreadPoolExecutor.java:1149
> 2019-02-22 06:11:48,885 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Got job 0 (run at 
> ThreadPoolExecutor.java:1149) with 1 output partitions
> 2019-02-22 06:11:48,886 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Final stage: ResultStage 0 (run at 
> ThreadPoolExecutor.java:1149)
> 2019-02-22 06:11:48,893 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Parents of final stage: List()
> 2019-02-22 06:11:48,895 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Missing parents: List()
> 2019-02-22 06:11:48,940 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Submitting ResultStage 0 
> (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149), which has no 
> missing parents
> 2019-02-22 06:11:49,004 [dag-scheduler-event-loop] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_1 stored as 
> values in memory (estimated size 11.7 KB, free 365.8 MB)
> 2019-02-22 06:11:49,024 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 362.596124 ms
> 2019-02-22 06:11:49,054 [dag-scheduler-event-loop] INFO  
> org.apache.spark.storage.memory.MemoryStore  - Block broadcast_1_piece0 
> stored as bytes in memory (estimated size 5.3 KB, free 365.7 MB)
> 2019-02-22 06:11:49,055 [dispatcher-event-loop-1] INFO  
> org.apache.spark.storage.BlockManagerInfo  - Added broadcast_1_piece0 in 
> memory on ip-10-4-3-123.eu-central-1.compute.internal:42935 (size: 5.3 KB, 
> free: 366.2 MB)
> 2019-02-22 06:11:49,066 [dag-scheduler-event-loop] INFO  
> org.apache.spark.SparkContext  - Created broadcast 1 from broadcast at 
> DAGScheduler.scala:1079
> 2019-02-22 06:11:49,101 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.DAGScheduler  - Submitting 1 missing tasks from 
> ResultStage 0 (MapPartitionsRDD[2] at run at ThreadPoolExecutor.java:1149) 
> (first 15 tasks are for partitions Vector(0))
> 2019-02-22 06:11:49,103 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.cluster.YarnScheduler  - Adding task set 0.0 with 
> 1 tasks
> 2019-02-22 06:11:49,116 [main] INFO  
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator  - Code 
> generated in 56.99095 ms
> 2019-02-22 06:11:49,188 [dag-scheduler-event-loop] INFO  
> org.apache.spark.scheduler.FairSchedulableBuilder  - Added task set 
> TaskSet_0.0 tasks to pool default
> 2019-02-22 06:12:04,190 [Timer-0] WARN  
> org.apache.spark.scheduler.cluster.YarnScheduler  - Initial 

Spark sql join optimizations

2019-02-26 Thread Akhilanand
Hello,

I recently noticed that spark doesn't optimize the joins when we are
limiting it.

Say when we have

payment.join(customer,Seq("customerId"), "left").limit(1).explain(true)


Spark doesn't optimize it.

>  == Physical Plan ==
> CollectLimit 1
> +- *(5) Project [customerId#29, paymentId#28, amount#30, name#41]
>+- SortMergeJoin [customerId#29], [customerId#40], LeftOuter
>   :- *(2) Sort [customerId#29 ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(customerId#29, 200)
>   : +- *(1) Project [_1#24 AS paymentId#28, _2#25 AS
> customerId#29, _3#26 AS amount#30]
>   :+- *(1) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple3, true])._1 AS _1#24, assertnotnull(input[0, scala.Tuple3,
> true])._2 AS _2#25, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#26]
>   :   +- Scan[obj#23]
>   +- *(4) Sort [customerId#40 ASC NULLS FIRST], false, 0
>  +- Exchange hashpartitioning(customerId#40, 200)
> +- *(3) Project [_1#37 AS customerId#40, _2#38 AS name#41]
>+- *(3) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple2, true])._1 AS _1#37, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#38]
>   +- Scan[obj#36]


Am I missing something here? Is there a way to avoid unnecessary joining of
data?

Regards,
Akhil