Yes, it did Thanks for the solution. I solved it locally, but I’m worried how I can do this when I’m using yarn because that same 15 Sec is taking on the yarn too :)
> On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <sriramsrikanth1...@gmail.com> > wrote: > > Hello Abhijeet, > I am not sure my answer will solve your problem but just think of the master > configuration set for spark application. > val spark = SparkSession.builder > .appName("Argoid_Realtime_Pipeline") > .master("local") > .getOrCreate() > I see you have set it has "local" not as "local[*]". > > From other blog, i got this information, sharing you in full sentence: > "We are going to work locally, running the application straight from our IDE, > so we are setting the master to local[*], meaning we are creating as many > threads as there are cores on the machine." > > Just check if this is reducing the time taken for processing, since by this > local[*] we are going to use all cores available, not just one core? > > Regards, > Sriram Srikanth > > On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com > <mailto:abhijeet.ku...@sentienz.com>> wrote: > Hi All, > > I'm just practicing Spark Streaming with joining two different stream. I > noticed that it's taking around 15 seconds for each record. Let me share the > details and the code: > > > > If you can see for stage id 2, it's taking 15 s. Isn't this strange. > > Code: > > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.streaming.Trigger > import org.apache.spark.sql.types.TimestampType > import org.apache.log4j.{Level, Logger} > > object StreamJoin{ > > val kafkaTopic1 = "demo2" > val kafkaTopic2 = "demo3" > val bootstrapServer = "localhost:9092" > > def main(args: Array[String]): Unit = { > val checkPointDir = "hdfs://localhost:8020/checkpo <>" > > val spark = SparkSession.builder > .appName("Argoid_Realtime_Pipeline") > .master("local") > .getOrCreate() > > val rootLogger = Logger.getRootLogger() > rootLogger.setLevel(Level.ERROR) > > import spark.implicits._ > > val df1 = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", bootstrapServer) > .option("subscribe", kafkaTopic1) > .option("failOnDataLoss", "false") > .load() > > val df2 = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", bootstrapServer) > .option("subscribe", kafkaTopic2) > .option("failOnDataLoss", "false") > .load() > > val order_details = df1 > .withColumn("s_order_id", get_json_object($"value".cast("String"), > "$.order_id")) > .withColumn("s_customer_id", get_json_object($"value".cast("String"), > "$.customer_id")) > .withColumn("s_promotion_id", get_json_object($"value".cast("String"), > "$.promotion_id")) > .withColumn("s_store_id", get_json_object($"value".cast("String"), > "$.store_id")) > .withColumn("s_product_id", get_json_object($"value".cast("String"), > "$.product_id")) > .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), > "$.warehouse_id")) > .withColumn("unit_cost", get_json_object($"value".cast("String"), > "$.unit_cost")) > .withColumn("total_cost", get_json_object($"value".cast("String"), > "$.total_cost")) > .withColumn("units_sold", get_json_object($"value".cast("String"), > "$.units_sold")) > .withColumn("promotion_cost", get_json_object($"value".cast("String"), > "$.promotion_cost")) > .withColumn("date_of_order", get_json_object($"value".cast("String"), > "$.date_of_order")) > .withColumn("tstamp_trans", current_timestamp()) > .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", > "yyyyMMddHHmmss").cast(TimestampType)) > .select($"s_customer_id", $"s_order_id", $"s_promotion_id", > $"s_store_id", $"s_product_id", > $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost", > $"total_cost".cast("integer") as "total_cost", > $"promotion_cost".cast("integer") as "promotion_cost", > $"date_of_order", $"tstamp_trans", $"TIMESTAMP", > $"units_sold".cast("integer") as "units_sold") > > val invoice_details = df2 > .withColumn("order_id", get_json_object($"value".cast("String"), > "$.order_id")) > .withColumn("invoice_status", get_json_object($"value".cast("String"), > "$.invoice_status")) > .where($"invoice_status" === "Success") > > .withColumn("tstamp_trans", current_timestamp()) > .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", > "yyyyMMddHHmmss").cast(TimestampType)) > > > val join_df = order_details > .join(invoice_details, order_details.col("s_order_id") === > invoice_details.col("order_id")) > .select($"s_customer_id", $"s_promotion_id", $"s_store_id", > $"s_product_id", > $"s_warehouse_id", $"unit_cost", $"total_cost", > $"promotion_cost", > $"date_of_order", > $"units_sold" as "units_sold", $"order_id") > > join_df.writeStream > .format("console") > .option("truncate", false) > .start() > .awaitTermination() > > } > } > > Thanks, > Abhijeet Kumar > > > > -- > Regards, > Srikanth Sriram > <Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 > PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at > 1.24.39 PM.png>