Hi, *Thanks to Chris and TD* for perpetually supporting my endeavor. I ran the code with a little bit of tweak here and there, *it worked well in Spark 2.2.1* giving me the Deserialized values (I used withColumn in the writeStream section to run all SQL functions of split and cast).
But, when I submit the same code in 2.3.0, I get an error which I couldn't find any solution of, on the internet. *Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId = cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread State: RUNNABLE'* *Final code (for clearer understanding of where it may go wrong in 2.3.0) -from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions import split, colclass test: spark = SparkSession.builder \ .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate() table1_stream = (spark.readStream.format("kafka").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()) query = table1_stream.select('value').withColumn('value', table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"), ",").getItem(0)) \ .withColumn("First_Name", split(col("value"), ",").getItem(1)) \ .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \ .drop('value').writeStream.format("console").start() time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py# Code not working in Spark 2.3.0# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py* 2) I'm getting the below output as expected, from the above code in 2.2.1. My query is, is there a way to get the header of a file being read and ensure header=True? (Or is it that for Structured Streaming, user has to provide headers explicitly all the time, as data shall always come in this structure [for Kafka] - topic, partition, offset, key, value, timestamp, timestampType; if so, then how to remove column headers explicitly from the data, as in the below table) I know it is a stream, and the data is fed in as messages, but still wanted experts to put some more light into it. +---+----------+---------+ | ID|First_Name|Last_Name| +---+----------+---------+ | Hi| null| null| | id|first_name|last_name| | 1| Kellyann| Moyne| | 2| Morty| Blacker| | 3| Tobit|Robardley| | 4| Wilona| Kells| | 5| Reggy|Comizzoli| | id|first_name|last_name| | 1| Kellyann| Moyne| | 2| Morty| Blacker| | 3| Tobit|Robardley| | 4| Wilona| Kells| | 5| Reggy|Comizzoli| | id|first_name|last_name| | 1| Kellyann| Moyne| | 2| Morty| Blacker| | 3| Tobit|Robardley| | 4| Wilona| Kells| | 5| Reggy|Comizzoli| | id|first_name|last_name| +---+----------+---------+ only showing top 20 rows Any help? Thanks, Aakash. On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com> wrote: > > With regards, > Sagar Grover > Phone - 7022175584 > > On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark....@gmail.com> > wrote: > >> Awesome, thanks for detailing! >> >> Was thinking the same, we've to split by comma for csv while casting >> inside. >> >> Cool! Shall try it and revert back tomm. >> >> Thanks a ton! >> >> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com> >> wrote: >> >>> To remain generic, the KafkaSource can only offer the lowest common >>> denominator for a schema (topic, partition, offset, key, value, timestamp, >>> timestampType). As such, you can't just feed it a StructType. When you are >>> using a producer or consumer directly with Kafka, serialization and >>> deserialization is often an orthogonal and implicit transform. However, in >>> Spark, serialization and deserialization is an explicit transform (e.g., >>> you define it in your query plan). >>> >>> >>> To make this more granular, if we imagine your source is registered as a >>> temp view named "foo": >>> >>> SELECT >>> >>> split(cast(value as string), ',')[0] as id, >>> >>> split(cast(value as string), ',')[1] as name >>> >>> FROM foo; >>> >>> >>> Assuming you were providing the following messages to Kafka: >>> >>> 1,aakash >>> >>> 2,tathagata >>> >>> 3,chris >>> >>> >>> You could make the query plan less repetitive. I don't believe Spark >>> offers from_csv out of the box as an expression (although CSV is well >>> supported as a data source). You could implement an expression by reusing a >>> lot of the supporting CSV classes which may result in a better user >>> experience vs. explicitly using split and array indices, etc. In this >>> simple example, casting the binary to a string just works because there is >>> a common understanding of string's encoded as bytes between Spark and Kafka >>> by default. >>> >>> >>> -Chris >>> ------------------------------ >>> *From:* Aakash Basu <aakash.spark....@gmail.com> >>> *Sent:* Thursday, March 15, 2018 10:48:45 AM >>> *To:* Bowden, Chris >>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user >>> >>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query >>> >>> Hey Chris, >>> >>> You got it right. I'm reading a *csv *file from local as mentioned >>> above, with a console producer on Kafka side. >>> >>> So, as it is a csv data with headers, shall I then use from_csv on the >>> spark side and provide a StructType to shape it up with a schema and then >>> cast it to string as TD suggested? >>> >>> I'm getting all of your points at a very high level. A little more >>> granularity would help. >>> >>> *In the slide TD just shared*, PFA, I'm confused at the point where he >>> is casting the value as string. Logically, the value shall consist of all >>> the entire data set, so, suppose, I've a table with many columns, *how >>> can I provide a single alias as he did in the groupBy. I missed it there >>> itself. Another question is, do I have to cast in groupBy itself? Can't I >>> do it directly in a select query? The last one, if the steps are followed, >>> can I then run a SQL query on top of the columns separately?* >>> >>> Thanks, >>> Aakash. >>> >>> >>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> >>> wrote: >>> >>> You need to tell Spark about the structure of the data, it doesn't know >>> ahead of time if you put avro, json, protobuf, etc. in kafka for the >>> message format. If the messages are in json, Spark provides from_json out >>> of the box. For a very simple POC you can happily cast the value to a >>> string, etc. if you are prototyping and pushing messages by hand with a >>> console producer on the kafka side. >>> >>> ________________________________________ >>> From: Aakash Basu <aakash.spark....@gmail.com> >>> Sent: Thursday, March 15, 2018 7:52:28 AM >>> To: Tathagata Das >>> Cc: Dylan Guedes; Georg Heiler; user >>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query >>> >>> Hi, >>> >>> And if I run this below piece of code - >>> >>> >>> from pyspark.sql import SparkSession >>> import time >>> >>> class test: >>> >>> >>> spark = SparkSession.builder \ >>> .appName("DirectKafka_Spark_Stream_Stream_Join") \ >>> .getOrCreate() >>> # ssc = StreamingContext(spark, 20) >>> >>> table1_stream = >>> (spark.readStream.format("kafka").option("startingOffsets", >>> "earliest").option("kafka.bootstrap.servers", >>> "localhost:9092").option("subscribe", "test1").load()) >>> >>> table2_stream = ( >>> spark.readStream.format("kafka").option("startingOffsets", >>> "earliest").option("kafka.bootstrap.servers", >>> >>> "localhost:9092").option("subscribe", >>> >>> "test2").load()) >>> >>> joined_Stream = table1_stream.join(table2_stream, "Id") >>> # >>> # joined_Stream.show() >>> >>> # query = >>> table1_stream.writeStream.format("console").start().awaitTermination() >>> # .queryName("table_A").format("memory") >>> # spark.sql("select * from table_A").show() >>> time.sleep(10) # sleep 20 seconds >>> # query.stop() >>> # query >>> >>> >>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit >>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 >>> Stream_Stream_Join.py >>> >>> >>> >>> >>> I get the below error (in Spark 2.3.0) - >>> >>> Traceback (most recent call last): >>> File >>> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", >>> line 4, in <module> >>> class test: >>> File >>> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py", >>> line 19, in test >>> joined_Stream = table1_stream.join(table2_stream, "Id") >>> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >>> pyspark.zip/pyspark/sql/dataframe.py", line 931, in join >>> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >>> py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ >>> File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/ >>> pyspark.zip/pyspark/sql/utils.py", line 69, in deco >>> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be >>> resolved on the left side of the join. The left-side columns: [key, value, >>> topic, partition, offset, timestamp, timestampType];' >>> >>> Seems, as per the documentation, they key and value are deserialized as >>> byte arrays. >>> >>> I am badly stuck at this step, not many materials online, with steps to >>> proceed on this, too. >>> >>> Any help, guys? >>> >>> Thanks, >>> Aakash. >>> >>> >>> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com >>> <mailto:aakash.spark....@gmail.com>> wrote: >>> Any help on the above? >>> >>> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com >>> <mailto:aakash.spark....@gmail.com>> wrote: >>> Hi, >>> >>> I progressed a bit in the above mentioned topic - >>> >>> 1) I am feeding a CSV file into the Kafka topic. >>> 2) Feeding the Kafka topic as readStream as TD's article suggests. >>> 3) Then, simply trying to do a show on the streaming dataframe, using >>> queryName('XYZ') in the writeStream and writing a sql query on top of it, >>> but that doesn't show anything. >>> 4) Once all the above problems are resolved, I want to perform a >>> stream-stream join. >>> >>> The CSV file I'm ingesting into Kafka has - >>> >>> id,first_name,last_name >>> 1,Kellyann,Moyne >>> 2,Morty,Blacker >>> 3,Tobit,Robardley >>> 4,Wilona,Kells >>> 5,Reggy,Comizzoli >>> >>> >>> My test code - >>> >>> >>> from pyspark.sql import SparkSession >>> import time >>> >>> class test: >>> >>> >>> spark = SparkSession.builder \ >>> .appName("DirectKafka_Spark_Stream_Stream_Join") \ >>> .getOrCreate() >>> # ssc = StreamingContext(spark, 20) >>> >>> table1_stream = >>> (spark.readStream.format("kafka").option("startingOffsets", >>> "earliest").option("kafka.bootstrap.servers", >>> "localhost:9092").option("subscribe", "test1").load()) >>> >>> # table2_stream = (spark.readStream.format("kafka").option(" >>> kafka.bootstrap.servers", "localhost:9092").option("subscribe", >>> "test2").load()) >>> >>> # joined_Stream = table1_stream.join(table2_stream, "Id") >>> # >>> # joined_Stream.show() >>> >>> query = >>> table1_stream.writeStream.format("console").queryName("table_A").start() >>> # .format("memory") >>> # spark.sql("select * from table_A").show() >>> # time.sleep(10) # sleep 20 seconds >>> # query.stop() >>> query.awaitTermination() >>> >>> >>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit >>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 >>> Stream_Stream_Join.py >>> >>> >>> The output I'm getting (whereas I simply want to show() my dataframe) - >>> >>> +----+--------------------+-----+---------+------+---------- >>> ----------+-------------+ >>> | key| value|topic|partition|offset| >>> timestamp|timestampType| >>> +----+--------------------+-----+---------+------+---------- >>> ----------+-------------+ >>> |null|[69 64 2C 66 69 7...|test1| 0| 5226|2018-03-15 15:48:...| >>> 0| >>> |null|[31 2C 4B 65 6C 6...|test1| 0| 5227|2018-03-15 15:48:...| >>> 0| >>> |null|[32 2C 4D 6F 72 7...|test1| 0| 5228|2018-03-15 15:48:...| >>> 0| >>> |null|[33 2C 54 6F 62 6...|test1| 0| 5229|2018-03-15 15:48:...| >>> 0| >>> |null|[34 2C 57 69 6C 6...|test1| 0| 5230|2018-03-15 15:48:...| >>> 0| >>> |null|[35 2C 52 65 67 6...|test1| 0| 5231|2018-03-15 15:48:...| >>> 0| >>> +----+--------------------+-----+---------+------+---------- >>> ----------+-------------+ >>> >>> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: { >>> "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8", >>> "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21", >>> "name" : "table_A", >>> "timestamp" : "2018-03-15T10:18:07.218Z", >>> "numInputRows" : 6, >>> "inputRowsPerSecond" : 461.53846153846155, >>> "processedRowsPerSecond" : 14.634146341463415, >>> "durationMs" : { >>> "addBatch" : 241, >>> "getBatch" : 15, >>> "getOffset" : 2, >>> "queryPlanning" : 2, >>> "triggerExecution" : 410, >>> "walCommit" : 135 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "KafkaSource[Subscribe[test1]]", >>> "startOffset" : { >>> "test1" : { >>> "0" : 5226 >>> } >>> }, >>> "endOffset" : { >>> "test1" : { >>> "0" : 5232 >>> } >>> }, >>> "numInputRows" : 6, >>> "inputRowsPerSecond" : 461.53846153846155, >>> "processedRowsPerSecond" : 14.634146341463415 >>> } ], >>> "sink" : { >>> "description" : "org.apache.spark.sql.executio >>> n.streaming.ConsoleSink@3dfc7990" >>> } >>> } >>> >>> P.S - If I add the below piece in the code, it doesn't print a DF of the >>> actual table. >>> >>> spark.sql("select * from table_A").show() >>> >>> Any help? >>> >>> >>> Thanks, >>> Aakash. >>> >>> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu < >>> aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>> wrote: >>> Thanks to TD, the savior! >>> >>> Shall look into it. >>> >>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das < >>> tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote: >>> Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-st >>> ream-joins-in-apache-spark-2-3.html >>> >>> This is true stream-stream join which will automatically buffer delayed >>> data and appropriately join stuff with SQL join semantics. Please check it >>> out :) >>> >>> TD >>> >>> >>> >>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com >>> <mailto:djmggue...@gmail.com>> wrote: >>> I misread it, and thought that you question was if pyspark supports >>> kafka lol. Sorry! >>> >>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com >>> <mailto:aakash.spark....@gmail.com>> wrote: >>> Hey Dylan, >>> >>> Great! >>> >>> Can you revert back to my initial and also the latest mail? >>> >>> Thanks, >>> Aakash. >>> >>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d >>> jmggue...@gmail.com>> wrote: >>> Hi, >>> >>> I've been using the Kafka with pyspark since 2.1. >>> >>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com >>> <mailto:aakash.spark....@gmail.com>> wrote: >>> Hi, >>> >>> I'm yet to. >>> >>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package >>> allows Python? I read somewhere, as of now Scala and Java are the languages >>> to be used. >>> >>> Please correct me if am wrong. >>> >>> Thanks, >>> Aakash. >>> >>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mai >>> lto:georg.kf.hei...@gmail.com>> wrote: >>> Did you try spark 2.3 with structured streaming? There watermarking and >>> plain sql might be really interesting for you. >>> Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.co >>> m>> schrieb am Mi. 14. März 2018 um 14:57: >>> Hi, >>> >>> Info (Using): >>> Spark Streaming Kafka 0.8 package >>> Spark 2.2.1 >>> Kafka 1.0.1 >>> >>> As of now, I am feeding paragraphs in Kafka console producer and my >>> Spark, which is acting as a receiver is printing the flattened words, which >>> is a complete RDD operation. >>> >>> My motive is to read two tables continuously (being updated) as two >>> distinct Kafka topics being read as two Spark Dataframes and join them >>> based on a key and produce the output. (I am from Spark-SQL background, >>> pardon my Spark-SQL-ish writing) >>> >>> It may happen, the first topic is receiving new data 15 mins prior to >>> the second topic, in that scenario, how to proceed? I should not lose any >>> data. >>> >>> As of now, I want to simply pass paragraphs, read them as RDD, convert >>> to DF and then join to get the common keys as the output. (Just for R&D). >>> >>> Started using Spark Streaming and Kafka today itself. >>> >>> Please help! >>> >>> Thanks, >>> Aakash. >>> >>> >>> >>> >>> >>> >>> >>> >>> >