Re: Spark streaming receivers

2020-08-09 Thread Dark Crusader
Hi Russell,
This is super helpful. Thank you so much.

Can you elaborate on the differences between structured streaming vs
dstreams? How would the number of receivers required etc change?

On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
wrote:

> Note, none of this applies to Direct streaming approaches, only receiver
> based Dstreams.
>
> You can think of a receiver as a long running task that never finishes.
> Each receiver is submitted to an executor slot somewhere, it then runs
> indefinitely and internally has a method which passes records over to a
> block management system. There is a timing that you set which decides when
> each block is "done" and records after that time has passed go into the
> next block (See parameter
> 
> spark.streaming.blockInterval)  Once a block is done it can be processed
> in the next Spark batch.. The gap between a block starting and a block
> being finished is why you can lose data in Receiver streaming without
> WriteAheadLoging. Usually your block interval is divisible into your batch
> interval so you'll get X blocks per batch. Each block becomes one partition
> of the job being done in a Streaming batch. Multiple receivers can be
> unified into a single dstream, which just means the blocks produced by all
> of those receivers are handled in the same Streaming batch.
>
> So if you have 5 different receivers, you need at minimum 6 executor
> cores. 1 core for each receiver, and 1 core to actually do your processing
> work. In a real world case you probably want significantly more  cores on
> the processing side than just 1. Without repartitioning you will never have
> more that
>
> A quick example
>
> I run 5 receivers with block interval of 100ms and spark batch interval of
> 1 second. I use union to group them all together, I will most likely end up
> with one Spark Job for each batch every second running with 50 partitions
> (1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
> of 10 cores in the system. 5 of them are running receivers, The remaining 5
> must process the 50 partitions of data generated by the last second of work.
>
> And again, just to reiterate, if you are doing a direct streaming approach
> or structured streaming, none of this applies.
>
> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
> relinquisheddra...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm having some trouble figuring out how receivers tie into spark
>> driver-executor structure.
>> Do all executors have a receiver that is blocked as soon as it
>> receives some stream data?
>> Or can multiple streams of data be taken as input into a single executor?
>>
>> I have stream data coming in at every second coming from 5 different
>> sources. I want to aggregate data from each of them. Does this mean I need
>> 5 executors or does it have to do with threads on the executor?
>>
>> I might be mixing in a few concepts here. Any help would be appreciated.
>> Thank you.
>>
>


回复:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

2020-08-09 Thread tianlangstudio
Hello, Sir! 
What about process and group the data first then write grouped data to Kafka 
topics A and B.   Then read topic A or B from another Spark Application and 
process it more. Like the term ETL's mean.

 
TianlangStudio
Some of the biggest lies: I will start tomorrow/Others are better than me/I am 
not good enough/I don't have time/This is the way I am
 


--
发件人:Amit Joshi 
发送时间:2020年8月10日(星期一) 02:37
收件人:user 
主 题:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

Hi,

I have a scenario where a kafka topic is being written with different types of 
json records.
I have to regroup the records based on the type and then fetch the schema and 
parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not be 
good.
If anyone can pls let me know if the approach will scale and possible pros and 
cons.
I am collecting the grouped records and then again forming the dataframe for 
each grouped record.
createKeyValue -> This is creating the key value pair with schema information.
stream.foreachRDD { (rdd, time) =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
  result.foreach(x=> println(x._1))
  result.map(x=> {
val spark = 
SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
  .select($"data.*")
  //.withColumn("entity", lit("invoice"))
  .withColumn("year",year($"TimeUpdated"))
  .withColumn("month",month($"TimeUpdated"))
  .withColumn("day",dayofmonth($"TimeUpdated"))
  
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
  })
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} 

github-logo.png
Description: Binary data
<>


51cto-logo.png
Description: Binary data


duxiaomai-logo (1).png
Description: Binary data


iqiyi-logo.png
Description: Binary data


huya-logo.png
Description: Binary data


logo-baidu-220X220.png
Description: Binary data


[Spark-Kafka-Streaming] Verifying the approach for multiple queries

2020-08-09 Thread Amit Joshi
Hi,

I have a scenario where a kafka topic is being written with different types
of json records.
I have to regroup the records based on the type and then fetch the schema
and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not
be good.
If anyone can pls let me know if the approach will scale and possible pros
and cons.
I am collecting the grouped records and then again forming the dataframe
for each grouped record.
createKeyValue -> This is creating the key value pair with schema
information.

stream.foreachRDD { (rdd, time) =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
  result.foreach(x=> println(x._1))
  result.map(x=> {
val spark =
SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
  .select($"data.*")
  //.withColumn("entity", lit("invoice"))
  .withColumn("year",year($"TimeUpdated"))
  .withColumn("month",month($"TimeUpdated"))
  .withColumn("day",dayofmonth($"TimeUpdated"))
  
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
  })
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


regexp_extract regex for extracting the columns from string

2020-08-09 Thread anbutech
Hi All,

I have a following info.in the data column.

<1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
packt=20 orgin=null address=null dest=fgjglgl

here I want to create a separate column for the above key value pairs after
the integer <1000> separated by spaces.
Is there any way to achieved it using regexp_extract inbuilt functions.i
don't want to do it using udf function.
apart from udf,is there any way to achieved it.


Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark batch job chaining

2020-08-09 Thread Jun Zhu
Hi
I am using Airflow in such scenario