Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm ingesting into Kafka has -

id,first_name,last_name
1,Kellyann,Moyne
2,Morty,Blacker
3,Tobit,Robardley
4,Wilona,Kells
5,Reggy,Comizzoli


My test code -

from pyspark.sql import SparkSession
import time

class test:


    spark = SparkSession.builder \
        .appName("DirectKafka_Spark_Stream_Stream_Join") \
        .getOrCreate()
    # ssc = StreamingContext(spark, 20)

    table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

    # table2_stream =
(spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test2").load())

    # joined_Stream = table1_stream.join(table2_stream, "Id")
    #
    # joined_Stream.show()

    query = 
table1_stream.writeStream.format("console").queryName("table_A").start()
 # .format("memory")
    # spark.sql("select * from table_A").show()
    # time.sleep(10)  # sleep 20 seconds
    # query.stop()
    query.awaitTermination()


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py


The output I'm getting (whereas I simply want to show() my dataframe) -

+----+--------------------+-----+---------+------+--------------------+-------------+
| key|               value|topic|partition|offset|
timestamp|timestampType|
+----+--------------------+-----+---------+------+--------------------+-------------+
|null|[69 64 2C 66 69 7...|test1|        0|  5226|2018-03-15
15:48:...|            0|
|null|[31 2C 4B 65 6C 6...|test1|        0|  5227|2018-03-15
15:48:...|            0|
|null|[32 2C 4D 6F 72 7...|test1|        0|  5228|2018-03-15
15:48:...|            0|
|null|[33 2C 54 6F 62 6...|test1|        0|  5229|2018-03-15
15:48:...|            0|
|null|[34 2C 57 69 6C 6...|test1|        0|  5230|2018-03-15
15:48:...|            0|
|null|[35 2C 52 65 67 6...|test1|        0|  5231|2018-03-15
15:48:...|            0|
+----+--------------------+-----+---------+------+--------------------+-------------+

18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
  "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
  "name" : "table_A",
  "timestamp" : "2018-03-15T10:18:07.218Z",
  "numInputRows" : 6,
  "inputRowsPerSecond" : 461.53846153846155,
  "processedRowsPerSecond" : 14.634146341463415,
  "durationMs" : {
    "addBatch" : 241,
    "getBatch" : 15,
    "getOffset" : 2,
    "queryPlanning" : 2,
    "triggerExecution" : 410,
    "walCommit" : 135
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test1]]",
    "startOffset" : {
      "test1" : {
        "0" : 5226
      }
    },
    "endOffset" : {
      "test1" : {
        "0" : 5232
      }
    },
    "numInputRows" : 6,
    "inputRowsPerSecond" : 461.53846153846155,
    "processedRowsPerSecond" : 14.634146341463415
  } ],
  "sink" : {
    "description" :
"org.apache.spark.sql.execution.streaming.ConsoleSink@3dfc7990"
  }
}

P.S - If I add the below piece in the code, it doesn't print a DF of the
actual table.

spark.sql("select * from table_A").show()


Any help?


Thanks,
Aakash.

On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com>
wrote:

> Thanks to TD, the savior!
>
> Shall look into it.
>
> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>> -stream-stream-joins-in-apache-spark-2-3.html
>>
>> This is true stream-stream join which will automatically buffer delayed
>> data and appropriately join stuff with SQL join semantics. Please check it
>> out :)
>>
>> TD
>>
>>
>>
>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com>
>> wrote:
>>
>>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com
>>> > wrote:
>>>
>>>> Hey Dylan,
>>>>
>>>> Great!
>>>>
>>>> Can you revert back to my initial and also the latest mail?
>>>>
>>>> Thanks,
>>>> Aakash.
>>>>
>>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've been using the Kafka with pyspark since 2.1.
>>>>>
>>>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
>>>>> aakash.spark....@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm yet to.
>>>>>>
>>>>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>>>>> allows Python? I read somewhere, as of now Scala and Java are the 
>>>>>> languages
>>>>>> to be used.
>>>>>>
>>>>>> Please correct me if am wrong.
>>>>>>
>>>>>> Thanks,
>>>>>> Aakash.
>>>>>>
>>>>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Did you try spark 2.3 with structured streaming? There watermarking
>>>>>>> and plain sql might be really interesting for you.
>>>>>>> Aakash Basu <aakash.spark....@gmail.com> schrieb am Mi. 14. März
>>>>>>> 2018 um 14:57:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>>>>>>
>>>>>>>> *Spark 2.2.1*
>>>>>>>> *Kafka 1.0.1*
>>>>>>>>
>>>>>>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>>>>>>> Spark, which is acting as a receiver is printing the flattened words, 
>>>>>>>> which
>>>>>>>> is a complete RDD operation.
>>>>>>>>
>>>>>>>> *My motive is to read two tables continuously (being updated) as
>>>>>>>> two distinct Kafka topics being read as two Spark Dataframes and join 
>>>>>>>> them
>>>>>>>> based on a key and produce the output. *(I am from Spark-SQL
>>>>>>>> background, pardon my Spark-SQL-ish writing)
>>>>>>>>
>>>>>>>> *It may happen, the first topic is receiving new data 15 mins prior
>>>>>>>> to the second topic, in that scenario, how to proceed? I should not 
>>>>>>>> lose
>>>>>>>> any data.*
>>>>>>>>
>>>>>>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>>>>>>> convert to DF and then join to get the common keys as the output. 
>>>>>>>> (Just for
>>>>>>>> R&D).
>>>>>>>>
>>>>>>>> Started using Spark Streaming and Kafka today itself.
>>>>>>>>
>>>>>>>> Please help!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Aakash.
>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>

Reply via email to