Thanks Chris! On Fri, Mar 16, 2018 at 10:13 PM, Bowden, Chris <chris.bow...@microfocus.com > wrote:
> 2. You must decide. If multiple streaming queries are launched in a single > / simple application, only you can dictate if a single failure should cause > the application to exit. If you use spark.streams.awaitAnyTermination be > aware it returns / throws if _any_ streaming query terminates. A more > complex application may keep track of many streaming queries and attempt to > relaunch them with lower latency for certain types of failures. > > > 3a. I'm not very familiar with py, but I doubt you need the sleep > > 3b. Kafka consumer tuning is simply a matter of passing appropriate config > keys to the source's options if desired > > 3c. I would argue the most obvious improvement would be a more structured > and compact data format if CSV isn't required. > > ------------------------------ > *From:* Aakash Basu <aakash.spark....@gmail.com> > *Sent:* Friday, March 16, 2018 9:12:39 AM > *To:* sagar grover > *Cc:* Bowden, Chris; Tathagata Das; Dylan Guedes; Georg Heiler; user; > jagrati.go...@myntra.com > > *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query > > Hi all, > > From the last mail queries in the bottom, query 1's doubt has been > resolved, I was already guessing so, that I resent same columns from Kafka > producer multiple times, hence the join gave duplicates. > > Retested with fresh Kafka feed and problem was solved. > > But, the other queries still persists, would anyone like to reply? :) > > Thanks, > Aakash. > > On 16-Mar-2018 3:57 PM, "Aakash Basu" <aakash.spark....@gmail.com> wrote: > > Hi all, > > The code was perfectly alright, just the package I was submitting had to > be the updated one (marked green below). The join happened but the output > has many duplicates (even though the *how *parameter is by default *inner*) > - > > Spark Submit: > > /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py > > > > Code: > > from pyspark.sql import SparkSession > import time > from pyspark.sql.functions import split, col > > class test: > > > spark = SparkSession.builder \ > .appName("DirectKafka_Spark_Stream_Stream_Join") \ > .getOrCreate() > > table1_stream = > (spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "test1").load()) > > table2_stream = > (spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "test2").load()) > > > query1 = table1_stream.select('value')\ > .withColumn('value', table1_stream.value.cast("string")) \ > .withColumn("ID", split(col("value"), ",").getItem(0)) \ > .withColumn("First_Name", split(col("value"), ",").getItem(1)) \ > .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \ > .drop('value') > > query2 = table2_stream.select('value') \ > .withColumn('value', table2_stream.value.cast("string")) \ > .withColumn("ID", split(col("value"), ",").getItem(0)) \ > .withColumn("Department", split(col("value"), ",").getItem(1)) \ > .withColumn("Date_joined", split(col("value"), ",").getItem(2)) \ > .drop('value') > > joined_Stream = query1.join(query2, "Id") > > a = query1.writeStream.format("console").start() > b = query2.writeStream.format("console").start() > c = joined_Stream.writeStream.format("console").start() > > time.sleep(10) > > a.awaitTermination() > b.awaitTermination() > c.awaitTermination() > > > Output - > > +---+----------+---------+---------------+-----------+ > | ID|First_Name|Last_Name| Department|Date_joined| > +---+----------+---------+---------------+-----------+ > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 3| Tobit|Robardley| Accounting| 8/3/2006| > | 5| Reggy|Comizzoli|Human Resources| 8/15/2012| > | 5| Reggy|Comizzoli|Human Resources| 8/15/2012| > +---+----------+---------+---------------+-----------+ > only showing top 20 rows > > > > > *Queries: * > > *1) Why even after inner join, the join is doing a outer type? * > > *2) Do we need to put awaitTermination on all the streams? Or putting only > on the input streams would suffice? * > *3) This code is not optimized, how to generically optimize streaming > code?* > > Thanks, > Aakash. > > On Fri, Mar 16, 2018 at 3:23 PM, Aakash Basu <aakash.spark....@gmail.com> > wrote: > > Hi, > > *Thanks to Chris and TD* for perpetually supporting my endeavor. I ran > the code with a little bit of tweak here and there, *it worked well in > Spark 2.2.1* giving me the Deserialized values (I used withColumn in the > writeStream section to run all SQL functions of split and cast). > > But, when I submit the same code in 2.3.0, I get an error which I couldn't > find any solution of, on the internet. > > > > > > *Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming > Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId = > cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets: > {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread > State: RUNNABLE' * > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Final code (for clearer understanding of where it may go wrong in 2.3.0) > - from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions > import split, colclass test: spark = SparkSession.builder \ > .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate() > table1_stream = (spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "test1").load()) query = > table1_stream.select('value').withColumn('value', > table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"), > ",").getItem(0)) \ .withColumn("First_Name", split(col("value"), > ",").getItem(1)) \ .withColumn("Last_Name", split(col("value"), > ",").getItem(2)) \ .drop('value').writeStream.format("console").start() > time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1# > /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py# > Code not working in Spark 2.3.0# > /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 > /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py > * > 2) I'm getting the below output as expected, from the above code in 2.2.1. > My query is, is there a way to get the header of a file being read and > ensure header=True? (Or is it that for Structured Streaming, user has to > provide headers explicitly all the time, as data shall always come in this > structure [for Kafka] - topic, partition, offset, key, value, timestamp, > timestampType; if so, then how to remove column headers explicitly from the > data, as in the below table) I know it is a stream, and the data is fed in > as messages, but still wanted experts to put some more light into it. > > +---+----------+---------+ > | ID|First_Name|Last_Name| > +---+----------+---------+ > | Hi| null| null| > | id|first_name|last_name| > | 1| Kellyann| Moyne| > | 2| Morty| Blacker| > | 3| Tobit|Robardley| > | 4| Wilona| Kells| > | 5| Reggy|Comizzoli| > | id|first_name|last_name| > | 1| Kellyann| Moyne| > | 2| Morty| Blacker| > | 3| Tobit|Robardley| > | 4| Wilona| Kells| > | 5| Reggy|Comizzoli| > | id|first_name|last_name| > | 1| Kellyann| Moyne| > | 2| Morty| Blacker| > | 3| Tobit|Robardley| > | 4| Wilona| Kells| > | 5| Reggy|Comizzoli| > | id|first_name|last_name| > +---+----------+---------+ > only showing top 20 rows > > > Any help? > > Thanks, > Aakash. > > On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com> > wrote: > > > With regards, > Sagar Grover > Phone - 7022175584 > > On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark....@gmail.com> > wrote: > > Awesome, thanks for detailing! > > Was thinking the same, we've to split by comma for csv while casting > inside. > > Cool! Shall try it and revert back tomm. > > Thanks a ton! > > On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com> > wrote: > > To remain generic, the KafkaSource can only offer the lowest common > denominator for a schema (topic, partition, offset, key, value, timestamp, > timestampType). As such, you can't just feed it a StructType. When you are > using a producer or consumer directly with Kafka, serialization and > deserialization is often an orthogonal and implicit transform. However, in > Spark, serialization and deserialization is an explicit transform (e.g., > you define it in your query plan). > > > To make this more granular, if we imagine your source is registered as a > temp view named "foo": > > SELECT > > split(cast(value as string), ',')[0] as id, > > split(cast(value as string), ',')[1] as name > > FROM foo; > > > Assuming you were providing the following messages to Kafka: > > 1,aakash > > 2,tathagata > > 3,chris > > > You could make the query plan less repetitive. I don't believe Spark > offers from_csv out of the box as an expression (although CSV is well > supported as a data source). You could implement an expression by reusing a > lot of the supporting CSV classes which may result in a better user > experience vs. explicitly using split and array indices, etc. In this > simple example, casting the binary to a string just works because there is > a common understanding of string's encoded as bytes between Spark and Kafka > by default. > > > -Chris > ------------------------------ > *From:* Aakash Basu <aakash.spark....@gmail.com> > *Sent:* Thursday, March 15, 2018 10:48:45 AM > *To:* Bowden, Chris > *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user > > *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query > > Hey Chris, > > You got it right. I'm reading a *csv *file from local as mentioned above, > with a console producer on Kafka side. > > So, as it is a csv data with headers, shall I then use from_csv on the > spark side and provide a StructType to shape it up with a schema and then > cast it to string as TD suggested? > > I'm getting all of your points at a very high level. A little more > granularity would help. > > *In the slide TD just shared*, PFA, I'm confused at the point where he is > casting the value as string. Logically, the value shall consist of all the > entire data set, so, suppose, I've a table with many columns, *how can I > provide a single alias as he did in the groupBy. I missed it there itself. > Another question is, do I have to cast in groupBy itself? Can't I do it > directly in a select query? The last one, if the steps are followed, can I > then run a SQL query on top of the columns separately?* > > Thanks, > Aakash. > > > On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> > wrote: > > You need to tell Spark about the structure of the data, it doesn't know > ahead of time if you put avro, json, protobuf, etc. in kafka for the > message format. If the messages are in json, Spark provides from_json out > of the box. For a very simple POC you can happily cast the value to a > string, etc. if you are prototyping and pushing messages by hand with a > console producer on the kafka side. > > ________________________________________ > From: Aakash Basu <aakash.spark....@gmail.com> > Sent: Thursday, March 15, 2018 7:52:28 AM > To: Tathagata Das > Cc: Dylan Guedes; Georg Heiler; user > Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query > > Hi, > > And if I run this below piece of code - > > > from pyspark.sql import SparkSession > import time > > class test: > > > spark = SparkSession.builder \ > .appName("DirectKafka_Spark_Stream_Stream_Join") \ > .getOrCreate() > # ssc = StreamingContext(spark, 20) > > table1_stream = > (spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "test1").load()) > > table2_stream = ( > spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > > "localhost:9092").option("subscribe", > > "test2").load()) > > joined_Stream = table1_stream.join(table2_stream, "Id") > # > # joined_Stream.show() > > # query = > table1_stream.writeStream.format("console").start().awaitTermination() > # .queryName("table_A").format("memory") > # spark.sql("select * from table_A").show() > time.sleep(10) # sleep 20 seconds > # query.stop() > # query > > > # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit > --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 > Stream_Stream_Join.py > > > > > I get the below error (in Spark 2.3.0) - > > Traceback (most recent call last): > File > "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", > line 4, in <module> > class test: > File > "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", > line 19, in test > joined_Stream = table1_stream.join(table2_stream, "Id") > File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ > pyspark.zip/pyspark/sql/dataframe.py", line 931, in join > File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ > py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ > File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ > pyspark.zip/pyspark/sql/utils.py", line 69, in deco > pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be > resolved on the left side of the join. The left-side columns: [key, value, > topic, partition, offset, timestamp, timestampType];' > > Seems, as per the documentation, they key and value are deserialized as > byte arrays. > > I am badly stuck at this step, not many materials online, with steps to > proceed on this, too. > > Any help, guys? > > Thanks, > Aakash. > > > On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Any help on the above? > > On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Hi, > > I progressed a bit in the above mentioned topic - > > 1) I am feeding a CSV file into the Kafka topic. > 2) Feeding the Kafka topic as readStream as TD's article suggests. > 3) Then, simply trying to do a show on the streaming dataframe, using > queryName('XYZ') in the writeStream and writing a sql query on top of it, > but that doesn't show anything. > 4) Once all the above problems are resolved, I want to perform a > stream-stream join. > > The CSV file I'm ingesting into Kafka has - > > id,first_name,last_name > 1,Kellyann,Moyne > 2,Morty,Blacker > 3,Tobit,Robardley > 4,Wilona,Kells > 5,Reggy,Comizzoli > > > My test code - > > > from pyspark.sql import SparkSession > import time > > class test: > > > spark = SparkSession.builder \ > .appName("DirectKafka_Spark_Stream_Stream_Join") \ > .getOrCreate() > # ssc = StreamingContext(spark, 20) > > table1_stream = > (spark.readStream.format("kafka").option("startingOffsets", > "earliest").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "test1").load()) > > # table2_stream = (spark.readStream.format("kafka").option(" > kafka.bootstrap.servers", "localhost:9092").option("subscribe", > "test2").load()) > > # joined_Stream = table1_stream.join(table2_stream, "Id") > # > # joined_Stream.show() > > query = > table1_stream.writeStream.format("console").queryName("table_A").start() > # .format("memory") > # spark.sql("select * from table_A").show() > # time.sleep(10) # sleep 20 seconds > # query.stop() > query.awaitTermination() > > > # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit > --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 > Stream_Stream_Join.py > > > The output I'm getting (whereas I simply want to show() my dataframe) - > > +----+--------------------+-----+---------+------+---------- > ----------+-------------+ > | key| value|topic|partition|offset| > timestamp|timestampType| > +----+--------------------+-----+---------+------+---------- > ----------+-------------+ > |null|[69 64 2C 66 69 7...|test1| 0| 5226|2018-03-15 15:48:...| > 0| > |null|[31 2C 4B 65 6C 6...|test1| 0| 5227|2018-03-15 15:48:...| > 0| > |null|[32 2C 4D 6F 72 7...|test1| 0| 5228|2018-03-15 15:48:...| > 0| > |null|[33 2C 54 6F 62 6...|test1| 0| 5229|2018-03-15 15:48:...| > 0| > |null|[34 2C 57 69 6C 6...|test1| 0| 5230|2018-03-15 15:48:...| > 0| > |null|[35 2C 52 65 67 6...|test1| 0| 5231|2018-03-15 15:48:...| > 0| > +----+--------------------+-----+---------+------+---------- > ----------+-------------+ > > 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: { > "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8", > "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21", > "name" : "table_A", > "timestamp" : "2018-03-15T10:18:07.218Z", > "numInputRows" : 6, > "inputRowsPerSecond" : 461.53846153846155, > "processedRowsPerSecond" : 14.634146341463415, > "durationMs" : { > "addBatch" : 241, > "getBatch" : 15, > "getOffset" : 2, > "queryPlanning" : 2, > "triggerExecution" : 410, > "walCommit" : 135 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : "KafkaSource[Subscribe[test1]]", > "startOffset" : { > "test1" : { > "0" : 5226 > } > }, > "endOffset" : { > "test1" : { > "0" : 5232 > } > }, > "numInputRows" : 6, > "inputRowsPerSecond" : 461.53846153846155, > "processedRowsPerSecond" : 14.634146341463415 > } ], > "sink" : { > "description" : "org.apache.spark.sql.executio > n.streaming.ConsoleSink@3dfc7990" > } > } > > P.S - If I add the below piece in the code, it doesn't print a DF of the > actual table. > > spark.sql("select * from table_A").show() > > Any help? > > > Thanks, > Aakash. > > On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Thanks to TD, the savior! > > Shall look into it. > > On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das < > tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote: > Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-st > ream-joins-in-apache-spark-2-3.html > > This is true stream-stream join which will automatically buffer delayed > data and appropriately join stuff with SQL join semantics. Please check it > out :) > > TD > > > > On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com > <mailto:djmggue...@gmail.com>> wrote: > I misread it, and thought that you question was if pyspark supports kafka > lol. Sorry! > > On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Hey Dylan, > > Great! > > Can you revert back to my initial and also the latest mail? > > Thanks, > Aakash. > > On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d > jmggue...@gmail.com>> wrote: > Hi, > > I've been using the Kafka with pyspark since 2.1. > > On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com > <mailto:aakash.spark....@gmail.com>> wrote: > Hi, > > I'm yet to. > > Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package > allows Python? I read somewhere, as of now Scala and Java are the languages > to be used. > > Please correct me if am wrong. > > Thanks, > Aakash. > > On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mailto: > georg.kf.hei...@gmail.com>> wrote: > Did you try spark 2.3 with structured streaming? There watermarking and > plain sql might be really interesting for you. > Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>> > schrieb am Mi. 14. März 2018 um 14:57: > Hi, > > Info (Using): > Spark Streaming Kafka 0.8 package > Spark 2.2.1 > Kafka 1.0.1 > > As of now, I am feeding paragraphs in Kafka console producer and my Spark, > which is acting as a receiver is printing the flattened words, which is a > complete RDD operation. > > My motive is to read two tables continuously (being updated) as two > distinct Kafka topics being read as two Spark Dataframes and join them > based on a key and produce the output. (I am from Spark-SQL background, > pardon my Spark-SQL-ish writing) > > It may happen, the first topic is receiving new data 15 mins prior to the > second topic, in that scenario, how to proceed? I should not lose any data. > > As of now, I want to simply pass paragraphs, read them as RDD, convert to > DF and then join to get the common keys as the output. (Just for R&D). > > Started using Spark Streaming and Kafka today itself. > > Please help! > > Thanks, > Aakash. > > > > > > > > > > > >