Hi,

And if I run this below piece of code -

from pyspark.sql import SparkSession
import time

class test:


    spark = SparkSession.builder \
        .appName("DirectKafka_Spark_Stream_Stream_Join") \
        .getOrCreate()
    # ssc = StreamingContext(spark, 20)

    table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

    table2_stream = (
    spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

           "localhost:9092").option("subscribe",

                                    "test2").load())

    joined_Stream = table1_stream.join(table2_stream, "Id")
    #
    # joined_Stream.show()

    # query =
    table1_stream.writeStream.format("console").start().awaitTermination()
 # .queryName("table_A").format("memory")
    # spark.sql("select * from table_A").show()
    time.sleep(10)  # sleep 20 seconds
    # query.stop()
    # query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in *Spark 2.3.0*) -

Traceback (most recent call last):
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in <module>
    class test:
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
    joined_Stream = table1_stream.join(table2_stream, "Id")
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 931, in join
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
line 1160, in __call__
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 69, in deco


*pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
resolved on the left side of the join. The left-side columns: [key, value,
topic, partition, offset, timestamp, timestampType];'*
Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com>
wrote:

> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
>> Hi,
>>
>> I progressed a bit in the above mentioned topic -
>>
>> 1) I am feeding a CSV file into the Kafka topic.
>> 2) Feeding the Kafka topic as readStream as TD's article suggests.
>> 3) Then, simply trying to do a show on the streaming dataframe, using
>> queryName('XYZ') in the writeStream and writing a sql query on top of it,
>> but that doesn't show anything.
>> 4) Once all the above problems are resolved, I want to perform a
>> stream-stream join.
>>
>> The CSV file I'm ingesting into Kafka has -
>>
>> id,first_name,last_name
>> 1,Kellyann,Moyne
>> 2,Morty,Blacker
>> 3,Tobit,Robardley
>> 4,Wilona,Kells
>> 5,Reggy,Comizzoli
>>
>>
>> My test code -
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>>     spark = SparkSession.builder \
>>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>>         .getOrCreate()
>>     # ssc = StreamingContext(spark, 20)
>>
>>     table1_stream = 
>> (spark.readStream.format("kafka").option("startingOffsets", 
>> "earliest").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test1").load())
>>
>>     # table2_stream = 
>> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test2").load())
>>
>>     # joined_Stream = table1_stream.join(table2_stream, "Id")
>>     #
>>     # joined_Stream.show()
>>
>>     query = 
>> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
>> .format("memory")
>>     # spark.sql("select * from table_A").show()
>>     # time.sleep(10)  # sleep 20 seconds
>>     # query.stop()
>>     query.awaitTermination()
>>
>>
>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 
>> Stream_Stream_Join.py
>>
>>
>> The output I'm getting (whereas I simply want to show() my dataframe) -
>>
>> +----+--------------------+-----+---------+------+----------
>> ----------+-------------+
>> | key|               value|topic|partition|offset|
>> timestamp|timestampType|
>> +----+--------------------+-----+---------+------+----------
>> ----------+-------------+
>> |null|[69 64 2C 66 69 7...|test1|        0|  5226|2018-03-15
>> 15:48:...|            0|
>> |null|[31 2C 4B 65 6C 6...|test1|        0|  5227|2018-03-15
>> 15:48:...|            0|
>> |null|[32 2C 4D 6F 72 7...|test1|        0|  5228|2018-03-15
>> 15:48:...|            0|
>> |null|[33 2C 54 6F 62 6...|test1|        0|  5229|2018-03-15
>> 15:48:...|            0|
>> |null|[34 2C 57 69 6C 6...|test1|        0|  5230|2018-03-15
>> 15:48:...|            0|
>> |null|[35 2C 52 65 67 6...|test1|        0|  5231|2018-03-15
>> 15:48:...|            0|
>> +----+--------------------+-----+---------+------+----------
>> ----------+-------------+
>>
>> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>>   "name" : "table_A",
>>   "timestamp" : "2018-03-15T10:18:07.218Z",
>>   "numInputRows" : 6,
>>   "inputRowsPerSecond" : 461.53846153846155,
>>   "processedRowsPerSecond" : 14.634146341463415,
>>   "durationMs" : {
>>     "addBatch" : 241,
>>     "getBatch" : 15,
>>     "getOffset" : 2,
>>     "queryPlanning" : 2,
>>     "triggerExecution" : 410,
>>     "walCommit" : 135
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaSource[Subscribe[test1]]",
>>     "startOffset" : {
>>       "test1" : {
>>         "0" : 5226
>>       }
>>     },
>>     "endOffset" : {
>>       "test1" : {
>>         "0" : 5232
>>       }
>>     },
>>     "numInputRows" : 6,
>>     "inputRowsPerSecond" : 461.53846153846155,
>>     "processedRowsPerSecond" : 14.634146341463415
>>   } ],
>>   "sink" : {
>>     "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@3dfc7990"
>>   }
>> }
>>
>> P.S - If I add the below piece in the code, it doesn't print a DF of the
>> actual table.
>>
>> spark.sql("select * from table_A").show()
>>
>>
>> Any help?
>>
>>
>> Thanks,
>> Aakash.
>>
>> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com
>> > wrote:
>>
>>> Thanks to TD, the savior!
>>>
>>> Shall look into it.
>>>
>>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>>
>>>> This is true stream-stream join which will automatically buffer delayed
>>>> data and appropriately join stuff with SQL join semantics. Please check it
>>>> out :)
>>>>
>>>> TD
>>>>
>>>>
>>>>
>>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
>>>> wrote:
>>>>
>>>>> I misread it, and thought that you question was if pyspark supports
>>>>> kafka lol. Sorry!
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
>>>>> aakash.spark....@gmail.com> wrote:
>>>>>
>>>>>> Hey Dylan,
>>>>>>
>>>>>> Great!
>>>>>>
>>>>>> Can you revert back to my initial and also the latest mail?
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>>>> aakash.spark....@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm yet to.
>>>>>>>>
>>>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark
>>>>>>>> Package allows Python? I read somewhere, as of now Scala and Java are 
>>>>>>>> the
>>>>>>>> languages to be used.
>>>>>>>>
>>>>>>>> Please correct me if am wrong.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Aakash.
>>>>>>>>
>>>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Did you try spark 2.3 with structured streaming? There
>>>>>>>>> watermarking and plain sql might be really interesting for you.
>>>>>>>>> Aakash Basu <aakash.spark....@gmail.com> schrieb am Mi. 14. März
>>>>>>>>> 2018 um 14:57:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>>>
>>>>>>>>>> *Spark 2.2.1*
>>>>>>>>>> *Kafka 1.0.1*
>>>>>>>>>>
>>>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and
>>>>>>>>>> my Spark, which is acting as a receiver is printing the flattened 
>>>>>>>>>> words,
>>>>>>>>>> which is a complete RDD operation.
>>>>>>>>>>
>>>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and 
>>>>>>>>>> join them
>>>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>>>
>>>>>>>>>> *It may happen, the first topic is receiving new data 15 mins
>>>>>>>>>> prior to the second topic, in that scenario, how to proceed? I 
>>>>>>>>>> should not
>>>>>>>>>> lose any data.*
>>>>>>>>>>
>>>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>>>> (Just for
>>>>>>>>>> R&D).
>>>>>>>>>>
>>>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>>>
>>>>>>>>>> Please help!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Aakash.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to