Hi,

*Thanks to Chris and TD* for perpetually supporting my endeavor. I ran the
code with a little bit of tweak here and there, *it worked well in Spark
2.2.1* giving me the Deserialized values (I used withColumn in the
writeStream section to run all SQL functions of split and cast).

But, when I submit the same code in 2.3.0, I get an error which I couldn't
find any solution of, on the internet.





*Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming
Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId =
cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread
State: RUNNABLE'*

































*Final code (for clearer understanding of where it may go wrong in 2.3.0)
-from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
import split, colclass test: spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load()) query =
table1_stream.select('value').withColumn('value',
table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"),
",").getItem(0)) \ .withColumn("First_Name", split(col("value"),
",").getItem(1)) \ .withColumn("Last_Name", split(col("value"),
",").getItem(2)) \ .drop('value').writeStream.format("console").start()
time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1#
/home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py#
Code not working in Spark 2.3.0#
/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py*
2) I'm getting the below output as expected, from the above code in 2.2.1.
My query is, is there a way to get the header of a file being read and
ensure header=True? (Or is it that for Structured Streaming, user has to
provide headers explicitly all the time, as data shall always come in this
structure [for Kafka] - topic, partition, offset, key, value, timestamp,
timestampType; if so, then how to remove column headers explicitly from the
data, as in the below table) I know it is a stream, and the data is fed in
as messages, but still wanted experts to put some more light into it.

+---+----------+---------+
| ID|First_Name|Last_Name|
+---+----------+---------+
| Hi|      null|     null|
| id|first_name|last_name|
|  1|  Kellyann|    Moyne|
|  2|     Morty|  Blacker|
|  3|     Tobit|Robardley|
|  4|    Wilona|    Kells|
|  5|     Reggy|Comizzoli|
| id|first_name|last_name|
|  1|  Kellyann|    Moyne|
|  2|     Morty|  Blacker|
|  3|     Tobit|Robardley|
|  4|    Wilona|    Kells|
|  5|     Reggy|Comizzoli|
| id|first_name|last_name|
|  1|  Kellyann|    Moyne|
|  2|     Morty|  Blacker|
|  3|     Tobit|Robardley|
|  4|    Wilona|    Kells|
|  5|     Reggy|Comizzoli|
| id|first_name|last_name|
+---+----------+---------+
only showing top 20 rows


Any help?

Thanks,
Aakash.

On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com>
wrote:

>
> With regards,
> Sagar Grover
> Phone - 7022175584
>
> On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
>> Awesome, thanks for detailing!
>>
>> Was thinking the same, we've to split by comma for csv while casting
>> inside.
>>
>> Cool! Shall try it and revert back tomm.
>>
>> Thanks a ton!
>>
>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>> wrote:
>>
>>> To remain generic, the KafkaSource can only offer the lowest common
>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>> timestampType). As such, you can't just feed it a StructType. When you are
>>> using a producer or consumer directly with Kafka, serialization and
>>> deserialization is often an orthogonal and implicit transform. However, in
>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>> you define it in your query plan).
>>>
>>>
>>> To make this more granular, if we imagine your source is registered as a
>>> temp view named "foo":
>>>
>>> SELECT
>>>
>>>   split(cast(value as string), ',')[0] as id,
>>>
>>>   split(cast(value as string), ',')[1] as name
>>>
>>> FROM foo;
>>>
>>>
>>> Assuming you were providing the following messages to Kafka:
>>>
>>> 1,aakash
>>>
>>> 2,tathagata
>>>
>>> 3,chris
>>>
>>>
>>> You could make the query plan less repetitive. I don't believe Spark
>>> offers from_csv out of the box as an expression (although CSV is well
>>> supported as a data source). You could implement an expression by reusing a
>>> lot of the supporting CSV classes which may result in a better user
>>> experience vs. explicitly using split and array indices, etc. In this
>>> simple example, casting the binary to a string just works because there is
>>> a common understanding of string's encoded as bytes between Spark and Kafka
>>> by default.
>>>
>>>
>>> -Chris
>>> ------------------------------
>>> *From:* Aakash Basu <aakash.spark....@gmail.com>
>>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>>> *To:* Bowden, Chris
>>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>>
>>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hey Chris,
>>>
>>> You got it right. I'm reading a *csv *file from local as mentioned
>>> above, with a console producer on Kafka side.
>>>
>>> So, as it is a csv data with headers, shall I then use from_csv on the
>>> spark side and provide a StructType to shape it up with a schema and then
>>> cast it to string as TD suggested?
>>>
>>> I'm getting all of your points at a very high level. A little more
>>> granularity would help.
>>>
>>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>>> is casting the value as string. Logically, the value shall consist of all
>>> the entire data set, so, suppose, I've a table with many columns, *how
>>> can I provide a single alias as he did in the groupBy. I missed it there
>>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>>> do it directly in a select query? The last one, if the steps are followed,
>>> can I then run a SQL query on top of the columns separately?*
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>> wrote:
>>>
>>> You need to tell Spark about the structure of the data, it doesn't know
>>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>>> message format. If the messages are in json, Spark provides from_json out
>>> of the box. For a very simple POC you can happily cast the value to a
>>> string, etc. if you are prototyping and pushing messages by hand with a
>>> console producer on the kafka side.
>>>
>>> ________________________________________
>>> From: Aakash Basu <aakash.spark....@gmail.com>
>>> Sent: Thursday, March 15, 2018 7:52:28 AM
>>> To: Tathagata Das
>>> Cc: Dylan Guedes; Georg Heiler; user
>>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hi,
>>>
>>> And if I run this below piece of code -
>>>
>>>
>>> from pyspark.sql import SparkSession
>>> import time
>>>
>>> class test:
>>>
>>>
>>>     spark = SparkSession.builder \
>>>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>>>         .getOrCreate()
>>>     # ssc = StreamingContext(spark, 20)
>>>
>>>     table1_stream = 
>>> (spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>> "localhost:9092").option("subscribe", "test1").load())
>>>
>>>     table2_stream = (
>>>     spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>>
>>>           "localhost:9092").option("subscribe",
>>>
>>>                                    "test2").load())
>>>
>>>     joined_Stream = table1_stream.join(table2_stream, "Id")
>>>     #
>>>     # joined_Stream.show()
>>>
>>>     # query =
>>>     table1_stream.writeStream.format("console").start().awaitTermination()
>>> # .queryName("table_A").format("memory")
>>>     # spark.sql("select * from table_A").show()
>>>     time.sleep(10)  # sleep 20 seconds
>>>     # query.stop()
>>>     # query
>>>
>>>
>>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
>>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
>>> Stream_Stream_Join.py
>>>
>>>
>>>
>>>
>>> I get the below error (in Spark 2.3.0) -
>>>
>>> Traceback (most recent call last):
>>>   File 
>>> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
>>> line 4, in <module>
>>>     class test:
>>>   File 
>>> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
>>> line 19, in test
>>>     joined_Stream = table1_stream.join(table2_stream, "Id")
>>>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
>>> pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>>>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
>>> py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>>>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
>>> pyspark.zip/pyspark/sql/utils.py", line 69, in deco
>>> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
>>> resolved on the left side of the join. The left-side columns: [key, value,
>>> topic, partition, offset, timestamp, timestampType];'
>>>
>>> Seems, as per the documentation, they key and value are deserialized as
>>> byte arrays.
>>>
>>> I am badly stuck at this step, not many materials online, with steps to
>>> proceed on this, too.
>>>
>>> Any help, guys?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com
>>> <mailto:aakash.spark....@gmail.com>> wrote:
>>> Any help on the above?
>>>
>>> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com
>>> <mailto:aakash.spark....@gmail.com>> wrote:
>>> Hi,
>>>
>>> I progressed a bit in the above mentioned topic -
>>>
>>> 1) I am feeding a CSV file into the Kafka topic.
>>> 2) Feeding the Kafka topic as readStream as TD's article suggests.
>>> 3) Then, simply trying to do a show on the streaming dataframe, using
>>> queryName('XYZ') in the writeStream and writing a sql query on top of it,
>>> but that doesn't show anything.
>>> 4) Once all the above problems are resolved, I want to perform a
>>> stream-stream join.
>>>
>>> The CSV file I'm ingesting into Kafka has -
>>>
>>> id,first_name,last_name
>>> 1,Kellyann,Moyne
>>> 2,Morty,Blacker
>>> 3,Tobit,Robardley
>>> 4,Wilona,Kells
>>> 5,Reggy,Comizzoli
>>>
>>>
>>> My test code -
>>>
>>>
>>> from pyspark.sql import SparkSession
>>> import time
>>>
>>> class test:
>>>
>>>
>>>     spark = SparkSession.builder \
>>>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>>>         .getOrCreate()
>>>     # ssc = StreamingContext(spark, 20)
>>>
>>>     table1_stream = 
>>> (spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>> "localhost:9092").option("subscribe", "test1").load())
>>>
>>>     # table2_stream = (spark.readStream.format("kafka").option("
>>> kafka.bootstrap.servers", "localhost:9092").option("subscribe",
>>> "test2").load())
>>>
>>>     # joined_Stream = table1_stream.join(table2_stream, "Id")
>>>     #
>>>     # joined_Stream.show()
>>>
>>>     query = 
>>> table1_stream.writeStream.format("console").queryName("table_A").start()
>>> # .format("memory")
>>>     # spark.sql("select * from table_A").show()
>>>     # time.sleep(10)  # sleep 20 seconds
>>>     # query.stop()
>>>     query.awaitTermination()
>>>
>>>
>>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
>>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
>>> Stream_Stream_Join.py
>>>
>>>
>>> The output I'm getting (whereas I simply want to show() my dataframe) -
>>>
>>> +----+--------------------+-----+---------+------+----------
>>> ----------+-------------+
>>> | key|               value|topic|partition|offset|
>>>  timestamp|timestampType|
>>> +----+--------------------+-----+---------+------+----------
>>> ----------+-------------+
>>> |null|[69 64 2C 66 69 7...|test1|        0|  5226|2018-03-15 15:48:...|
>>>           0|
>>> |null|[31 2C 4B 65 6C 6...|test1|        0|  5227|2018-03-15 15:48:...|
>>>           0|
>>> |null|[32 2C 4D 6F 72 7...|test1|        0|  5228|2018-03-15 15:48:...|
>>>           0|
>>> |null|[33 2C 54 6F 62 6...|test1|        0|  5229|2018-03-15 15:48:...|
>>>           0|
>>> |null|[34 2C 57 69 6C 6...|test1|        0|  5230|2018-03-15 15:48:...|
>>>           0|
>>> |null|[35 2C 52 65 67 6...|test1|        0|  5231|2018-03-15 15:48:...|
>>>           0|
>>> +----+--------------------+-----+---------+------+----------
>>> ----------+-------------+
>>>
>>> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>>>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>>>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>>>   "name" : "table_A",
>>>   "timestamp" : "2018-03-15T10:18:07.218Z",
>>>   "numInputRows" : 6,
>>>   "inputRowsPerSecond" : 461.53846153846155,
>>>   "processedRowsPerSecond" : 14.634146341463415,
>>>   "durationMs" : {
>>>     "addBatch" : 241,
>>>     "getBatch" : 15,
>>>     "getOffset" : 2,
>>>     "queryPlanning" : 2,
>>>     "triggerExecution" : 410,
>>>     "walCommit" : 135
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>>     "description" : "KafkaSource[Subscribe[test1]]",
>>>     "startOffset" : {
>>>       "test1" : {
>>>         "0" : 5226
>>>       }
>>>     },
>>>     "endOffset" : {
>>>       "test1" : {
>>>         "0" : 5232
>>>       }
>>>     },
>>>     "numInputRows" : 6,
>>>     "inputRowsPerSecond" : 461.53846153846155,
>>>     "processedRowsPerSecond" : 14.634146341463415
>>>   } ],
>>>   "sink" : {
>>>     "description" : "org.apache.spark.sql.executio
>>> n.streaming.ConsoleSink@3dfc7990"
>>>   }
>>> }
>>>
>>> P.S - If I add the below piece in the code, it doesn't print a DF of the
>>> actual table.
>>>
>>> spark.sql("select * from table_A").show()
>>>
>>> Any help?
>>>
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <
>>> aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>> wrote:
>>> Thanks to TD, the savior!
>>>
>>> Shall look into it.
>>>
>>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote:
>>> Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-st
>>> ream-joins-in-apache-spark-2-3.html
>>>
>>> This is true stream-stream join which will automatically buffer delayed
>>> data and appropriately join stuff with SQL join semantics. Please check it
>>> out :)
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com
>>> <mailto:djmggue...@gmail.com>> wrote:
>>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com
>>> <mailto:aakash.spark....@gmail.com>> wrote:
>>> Hey Dylan,
>>>
>>> Great!
>>>
>>> Can you revert back to my initial and also the latest mail?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d
>>> jmggue...@gmail.com>> wrote:
>>> Hi,
>>>
>>> I've been using the Kafka with pyspark since 2.1.
>>>
>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com
>>> <mailto:aakash.spark....@gmail.com>> wrote:
>>> Hi,
>>>
>>> I'm yet to.
>>>
>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>> allows Python? I read somewhere, as of now Scala and Java are the languages
>>> to be used.
>>>
>>> Please correct me if am wrong.
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mai
>>> lto:georg.kf.hei...@gmail.com>> wrote:
>>> Did you try spark 2.3 with structured streaming? There watermarking and
>>> plain sql might be really interesting for you.
>>> Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.co
>>> m>> schrieb am Mi. 14. März 2018 um 14:57:
>>> Hi,
>>>
>>> Info (Using):
>>> Spark Streaming Kafka 0.8 package
>>> Spark 2.2.1
>>> Kafka 1.0.1
>>>
>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>> Spark, which is acting as a receiver is printing the flattened words, which
>>> is a complete RDD operation.
>>>
>>> My motive is to read two tables continuously (being updated) as two
>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>> based on a key and produce the output. (I am from Spark-SQL background,
>>> pardon my Spark-SQL-ish writing)
>>>
>>> It may happen, the first topic is receiving new data 15 mins prior to
>>> the second topic, in that scenario, how to proceed? I should not lose any
>>> data.
>>>
>>> As of now, I want to simply pass paragraphs, read them as RDD, convert
>>> to DF and then join to get the common keys as the output. (Just for R&D).
>>>
>>> Started using Spark Streaming and Kafka today itself.
>>>
>>> Please help!
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>

Reply via email to