My joindf is taking 14 sec in the first run and i have commented out the withcolumn still it is taking more time.
On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim <kabh...@gmail.com> wrote: > You may need to put efforts on triage how much time is spent on each part. > Without such information you are only able to get general tips and tricks. > Please check SQL tab and see DAG graph as well as details (logical plan, > physical plan) to see whether you're happy about these plans. > > General tip on quick look of query: avoid using withColumn repeatedly and > try to put them in one select statement. If I'm not mistaken, it is known > as a bit costly since each call would produce a new Dataset. Defining > schema and using "from_json" will eliminate all the call of withColumn"s" > and extra calls of "get_json_object". > > - Jungtaek Lim (HeartSaVioR) > > 2018년 11월 27일 (화) 오후 2:44, Siva Samraj <samraj.mi...@gmail.com>님이 작성: > >> Hello All, >> >> I am using Spark 2.3 version and i am trying to write Spark Streaming >> Join. It is a basic join and it is taking more time to join the stream >> data. I am not sure any configuration we need to set on Spark. >> >> Code: >> ************************* >> import org.apache.spark.sql.SparkSession >> import org.apache.spark.sql.functions._ >> import org.apache.spark.sql.streaming.Trigger >> import org.apache.spark.sql.types.TimestampType >> >> object OrderSalesJoin { >> def main(args: Array[String]): Unit = { >> >> setEnvironmentVariables(args(0)) >> >> val order_topic = args(1) >> val invoice_topic = args(2) >> val dest_topic_name = args(3) >> >> val spark = >> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate() >> >> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name >> >> import spark.implicits._ >> >> >> val order_df = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", KAFKA_BROKERS) >> .option("subscribe", order_topic) >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", "false") >> .option("kafka.replica.fetch.max.bytes", "15728640") >> .load() >> >> >> val invoice_df = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", KAFKA_BROKERS) >> .option("subscribe", invoice_topic) >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", "false") >> .option("kafka.replica.fetch.max.bytes", "15728640") >> .load() >> >> >> val order_details = order_df >> .withColumn("s_order_id", get_json_object($"value".cast("String"), >> "$.order_id")) >> .withColumn("s_customer_id", >> get_json_object($"value".cast("String"), "$.customer_id")) >> .withColumn("s_promotion_id", >> get_json_object($"value".cast("String"), "$.promotion_id")) >> .withColumn("s_store_id", get_json_object($"value".cast("String"), >> "$.store_id")) >> .withColumn("s_product_id", >> get_json_object($"value".cast("String"), "$.product_id")) >> .withColumn("s_warehouse_id", >> get_json_object($"value".cast("String"), "$.warehouse_id")) >> .withColumn("unit_cost", get_json_object($"value".cast("String"), >> "$.unit_cost")) >> .withColumn("total_cost", get_json_object($"value".cast("String"), >> "$.total_cost")) >> .withColumn("units_sold", get_json_object($"value".cast("String"), >> "$.units_sold")) >> .withColumn("promotion_cost", >> get_json_object($"value".cast("String"), "$.promotion_cost")) >> .withColumn("date_of_order", >> get_json_object($"value".cast("String"), "$.date_of_order")) >> .withColumn("tstamp_trans", current_timestamp()) >> .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", >> "yyyyMMddHHmmss").cast(TimestampType)) >> .select($"s_customer_id", $"s_order_id", $"s_promotion_id", >> $"s_store_id", $"s_product_id", >> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost", >> $"total_cost".cast("integer") as "total_cost", >> $"promotion_cost".cast("integer") as "promotion_cost", >> $"date_of_order", $"tstamp_trans", $"TIMESTAMP", >> $"units_sold".cast("integer") as "units_sold") >> >> >> val invoice_details = invoice_df >> .withColumn("order_id", get_json_object($"value".cast("String"), >> "$.order_id")) >> .withColumn("invoice_status", >> get_json_object($"value".cast("String"), "$.invoice_status")) >> .where($"invoice_status" === "Success") >> >> .withColumn("tstamp_trans", current_timestamp()) >> .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", >> "yyyyMMddHHmmss").cast(TimestampType)) >> >> >> >> val order_wm = order_details.withWatermark("tstamp_trans", args(4)) >> val invoice_wm = invoice_details.withWatermark("tstamp_trans", >> args(5)) >> >> val join_df = order_wm >> .join(invoice_wm, order_wm.col("s_order_id") === >> invoice_wm.col("order_id")) >> .select($"s_customer_id", $"s_promotion_id", $"s_store_id", >> $"s_product_id", >> $"s_warehouse_id", $"unit_cost", $"total_cost", >> $"promotion_cost", >> $"date_of_order", >> $"units_sold" as "units_sold", $"order_id") >> >> val final_ids = join_df >> .withColumn("value", to_json(struct($"s_customer_id", >> $"s_promotion_id", $"s_store_id", $"s_product_id", >> $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", >> $"total_cost".cast("Int") as "total_cost", >> $"promotion_cost".cast("Int") as "promotion_cost", >> $"date_of_order", >> $"units_sold".cast("Int") as "units_sold", $"order_id"))) >> .dropDuplicates("order_id") >> .select("value") >> >> >> val write_df = final_ids >> .writeStream >> .format("kafka") >> .option("kafka.bootstrap.servers", KAFKA_BROKERS) >> .option("topic", dest_topic_name) >> .option("checkpointLocation", checkpoint_path) >> .trigger(Trigger.ProcessingTime("1 second")) >> .start() >> >> write_df.awaitTermination() >> >> } >> >> } >> **************************** >> >> Let me know, it is taking more than a minute for every run. The waiting >> time is keep on increasing as the data grows. >> >> Please let me know, any thing we need to configure to make it fast. I >> tried increase the parallesim. >> >> Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is >> very less. Even for the single data it is taking time. >> >> >>