Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this 
when I’m using yarn because that same 15 Sec is taking on the yarn too :)

> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <sriramsrikanth1...@gmail.com> 
> wrote:
> 
> Hello Abhijeet,
> I am not sure my answer will solve your problem but just think of the master 
> configuration set for spark application.
> val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> I see you have set it has "local" not as "local[*]".
> 
> From other blog, i got this information, sharing you in full sentence:
> "We are going to work locally, running the application straight from our IDE, 
> so we are setting the master to local[*], meaning we are creating as many 
> threads as there are cores on the machine."
> 
> Just check if this is reducing the time taken for processing, since by this 
> local[*] we are going to use all cores available, not just one core?
> 
> Regards,
> Sriram Srikanth
> 
> On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com 
> <mailto:abhijeet.ku...@sentienz.com>> wrote:
> Hi All,
> 
> I'm just practicing Spark Streaming with joining two different stream. I 
> noticed that it's taking around 15 seconds for each record. Let me share the 
> details and the code:
> 
> 
> 
> If you can see for stage id 2, it's taking 15 s. Isn't this strange.
> 
> Code:
> 
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
> import org.apache.log4j.{Level, Logger}
> 
> object StreamJoin{
> 
>   val kafkaTopic1 = "demo2"
>   val kafkaTopic2 = "demo3"
>   val bootstrapServer = "localhost:9092"
> 
>   def main(args: Array[String]): Unit = {
>     val checkPointDir = "hdfs://localhost:8020/checkpo <>"
> 
>     val spark = SparkSession.builder
>       .appName("Argoid_Realtime_Pipeline")
>       .master("local")
>       .getOrCreate()
> 
>     val rootLogger = Logger.getRootLogger()
>     rootLogger.setLevel(Level.ERROR)
> 
>     import spark.implicits._
> 
>     val df1 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic1)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val df2 = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", bootstrapServer)
>       .option("subscribe", kafkaTopic2)
>       .option("failOnDataLoss", "false")
>       .load()
> 
>     val order_details = df1
>       .withColumn("s_order_id", get_json_object($"value".cast("String"), 
> "$.order_id"))
>       .withColumn("s_customer_id", get_json_object($"value".cast("String"), 
> "$.customer_id"))
>       .withColumn("s_promotion_id", get_json_object($"value".cast("String"), 
> "$.promotion_id"))
>       .withColumn("s_store_id", get_json_object($"value".cast("String"), 
> "$.store_id"))
>       .withColumn("s_product_id", get_json_object($"value".cast("String"), 
> "$.product_id"))
>       .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), 
> "$.warehouse_id"))
>       .withColumn("unit_cost", get_json_object($"value".cast("String"), 
> "$.unit_cost"))
>       .withColumn("total_cost", get_json_object($"value".cast("String"), 
> "$.total_cost"))
>       .withColumn("units_sold", get_json_object($"value".cast("String"), 
> "$.units_sold"))
>       .withColumn("promotion_cost", get_json_object($"value".cast("String"), 
> "$.promotion_cost"))
>       .withColumn("date_of_order", get_json_object($"value".cast("String"), 
> "$.date_of_order"))
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
> "yyyyMMddHHmmss").cast(TimestampType))
>       .select($"s_customer_id", $"s_order_id", $"s_promotion_id", 
> $"s_store_id", $"s_product_id",
>         $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
>         $"total_cost".cast("integer") as "total_cost", 
> $"promotion_cost".cast("integer") as "promotion_cost",
>         $"date_of_order", $"tstamp_trans", $"TIMESTAMP", 
> $"units_sold".cast("integer") as "units_sold")
> 
>     val invoice_details = df2
>       .withColumn("order_id", get_json_object($"value".cast("String"), 
> "$.order_id"))
>       .withColumn("invoice_status", get_json_object($"value".cast("String"), 
> "$.invoice_status"))
>       .where($"invoice_status" === "Success")
> 
>       .withColumn("tstamp_trans", current_timestamp())
>       .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", 
> "yyyyMMddHHmmss").cast(TimestampType))
> 
> 
>     val join_df = order_details
>       .join(invoice_details, order_details.col("s_order_id") === 
> invoice_details.col("order_id"))
>       .select($"s_customer_id", $"s_promotion_id", $"s_store_id", 
> $"s_product_id",
>         $"s_warehouse_id", $"unit_cost", $"total_cost",
>         $"promotion_cost",
>         $"date_of_order",
>         $"units_sold" as "units_sold", $"order_id")
> 
>     join_df.writeStream
>       .format("console")
>       .option("truncate", false)
>       .start()
>       .awaitTermination()
> 
>   }
> }
> 
> Thanks,
> Abhijeet Kumar
> 
> 
> 
> -- 
> Regards, 
> Srikanth Sriram
> <Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 
> PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 
> 1.24.39 PM.png>

Reply via email to