Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-21 Thread Aakash Basu
Thanks Chris!

On Fri, Mar 16, 2018 at 10:13 PM, Bowden, Chris <chris.bow...@microfocus.com
> wrote:

> 2. You must decide. If multiple streaming queries are launched in a single
> / simple application, only you can dictate if a single failure should cause
> the application to exit. If you use spark.streams.awaitAnyTermination be
> aware it returns / throws if _any_ streaming query terminates. A more
> complex application may keep track of many streaming queries and attempt to
> relaunch them with lower latency for certain types of failures.
>
>
> 3a. I'm not very familiar with py, but I doubt you need the sleep
>
> 3b. Kafka consumer tuning is simply a matter of passing appropriate config
> keys to the source's options if desired
>
> 3c. I would argue the most obvious improvement would be a more structured
> and compact data format if CSV isn't required.
>
> --
> *From:* Aakash Basu <aakash.spark@gmail.com>
> *Sent:* Friday, March 16, 2018 9:12:39 AM
> *To:* sagar grover
> *Cc:* Bowden, Chris; Tathagata Das; Dylan Guedes; Georg Heiler; user;
> jagrati.go...@myntra.com
>
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi all,
>
> From the last mail queries in the bottom, query 1's doubt has been
> resolved, I was already guessing so, that I resent same columns from Kafka
> producer multiple times, hence the join gave duplicates.
>
> Retested with fresh Kafka feed and problem was solved.
>
> But, the other queries still persists, would anyone like to reply? :)
>
> Thanks,
> Aakash.
>
> On 16-Mar-2018 3:57 PM, "Aakash Basu" <aakash.spark@gmail.com> wrote:
>
> Hi all,
>
> The code was perfectly alright, just the package I was submitting had to
> be the updated one (marked green below). The join happened but the output
> has many duplicates (even though the *how *parameter is by default *inner*)
> -
>
> Spark Submit:
>
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py
>
>
>
> Code:
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
>
> query1 = table1_stream.select('value')\
> .withColumn('value', table1_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("First_Name", split(col("value"), ",").getItem(1)) \
> .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> query2 = table2_stream.select('value') \
> .withColumn('value', table2_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("Department", split(col("value"), ",").getItem(1)) \
> .withColumn("Date_joined", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> joined_Stream = query1.join(query2, "Id")
>
> a = query1.writeStream.format("console").start()
> b = query2.writeStream.format("console").start()
> c = joined_Stream.writeStream.format("console").start()
>
> time.sleep(10)
>
> a.awaitTermination()
> b.awaitTermination()
> c.awaitTermination()
>
>
> Output -
>
> +---+--+-+---+---+
> | ID|First_Name|Last_Name| Department|Date_joined|
> +---+--+-+---+---+
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
wrote:
>>>
>>>> Awesome, thanks for detailing!
>>>>
>>>> Was thinking the same, we've to split by comma for csv while casting
>>>> inside.
>>>>
>>>> Cool! Shall try it and revert back tomm.
>>>>
>>>> Thanks a ton!
>>>>
>>>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>>> wrote:
>>>>
>>>>> To remain generic, the KafkaSource can only offer the lowest common
>>>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>>>> timestampType). As such, you can't just feed it a StructType. When you are
>>>>> using a producer or consumer directly with Kafka, serialization and
>>>>> deserialization is often an orthogonal and implicit transform. However, in
>>>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>>>> you define it in your query plan).
>>>>>
>>>>>
>>>>> To make this more granular, if we imagine your source is registered as
>>>>> a temp view named "foo":
>>>>>
>>>>> SELECT
>>>>>
>>>>>   split(cast(value as string), ',')[0] as id,
>>>>>
>>>>>   split(cast(value as string), ',')[1] as name
>>>>>
>>>>> FROM foo;
>>>>>
>>>>>
>>>>> Assuming you were providing the following messages to Kafka:
>>>>>
>>>>> 1,aakash
>>>>>
>>>>> 2,tathagata
>>>>>
>>>>> 3,chris
>>>>>
>>>>>
>>>>> You could make the query plan less repetitive. I don't believe Spark
>>>>> offers from_csv out of the box as an expression (although CSV is well
>>>>> supported as a data source). You could implement an expression by reusing 
>>>>> a
>>>>> lot of the supporting CSV classes which may result in a better user
>>>>> experience vs. explicitly using split and array indices, etc. In this
>>>>> simple example, casting the binary to a string just works because there is
>>>>> a common understanding of string's encoded as bytes between Spark and 
>>>>> Kafka
>>>>> by default.
>>>>>
>>>>>
>>>>> -Chris
>>>>> --
>>>>> *From:* Aakash Basu <aakash.spark@gmail.com>
>>>>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>>>>> *To:* Bowden, Chris
>>>>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>>>>
>>>>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>>>
>>>>> Hey Chris,
>>>>>
>>>>> You got it right. I'm reading a *csv *file from local as mentioned
>>>>> above, with a console producer on Kafka side.
>>>>>
>>>>> So, as it is a csv data with headers, shall I then use from_csv on the
>>>>> spark side and provide a StructType to shape it up with a schema and then
>>>>> cast it to string as TD suggested?
>>>>>
>>>>> I'm getting all of your points at a very high level. A little more
>>>>> granularity would help.
>>>>>
>>>>> *In the slide TD just shared*, PFA, I'm confused at the point where
>>>>> he is casting the value as string. Logically, the value shall consist of
>>>>> all the entire data set, so, suppose, I've a table with many columns, *how
>>>>> can I provide a single alias as he did in the groupBy. I missed it there
>>>>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>>>>> do it directly in a select query? The last one, if the steps are followed,
>>>>> can I then run a SQL query on top of the columns separately?*
>>>>>
>>>>> Thanks,
>>>>> Aakash.
>>>>>
>>>>>
>>>>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>>>> wrote:
>>>>>
>>>>> You need to tell Spark about the structure of the data, it doesn't
>>>>> know ahead of time if you put avro, json, protobuf, etc. in kafka for the
>>>>> message format. If the messages are in json, Spark provides from_json out
>>>>> of the

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Final code (for clearer understanding of where it may go wrong in 2.3.0)
> -from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
> import split, colclass test: spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
> table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load()) query =
> table1_stream.select('value').withColumn('value',
> table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"),
> ",").getItem(0)) \ .withColumn("First_Name", split(col("value"),
> ",").getItem(1)) \ .withColumn("Last_Name", split(col("value"),
> ",").getItem(2)) \ .drop('value').writeStream.format("console").start()
> time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1#
> /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py#
> Code not working in Spark 2.3.0#
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py*
> 2) I'm getting the below output as expected, from the above code in 2.2.1.
> My query is, is there a way to get the header of a file being read and
> ensure header=True? (Or is it that for Structured Streaming, user has to
> provide headers explicitly all the time, as data shall always come in this
> structure [for Kafka] - topic, partition, offset, key, value, timestamp,
> timestampType; if so, then how to remove column headers explicitly from the
> data, as in the below table) I know it is a stream, and the data is fed in
> as messages, but still wanted experts to put some more light into it.
>
> +---+--+-+
> | ID|First_Name|Last_Name|
> +---+--+-+
> | Hi|  null| null|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|Moyne|
> |  2| Morty|  Blacker|
> |  3| Tobit|Robardley|
> |  4|Wilona|Kells|
> |  5| Reggy|Comizzoli|
> | id|first_name|last_name|
> +---+--+-+
> only showing top 20 rows
>
>
> Any help?
>
> Thanks,
> Aakash.
>
> On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com>
> wrote:
>
>>
>> With regards,
>> Sagar Grover
>> Phone - 7022175584
>>
>> On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark@gmail.com
>> > wrote:
>>
>>> Awesome, thanks for detailing!
>>>
>>> Was thinking the same, we've to split by comma for csv while casting
>>> inside.
>>>
>>> Cool! Shall try it and revert back tomm.
>>>
>>> Thanks a ton!
>>>
>>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>> wrote:
>>>
>>>> To remain generic, the KafkaSource can only offer the lowest common
>>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>>> timestampType). As such, you can't just feed it a StructType. When you are
>>>> using a producer or consumer directly with Kafka, serialization and
>>>> deserialization is often an orthogonal and implicit transform. However, in
>>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>>> you define it in your query plan).
>>>>
>>>>
>>>> To make this more granular, if we imagine your source is registered as
>>>> a temp view named "foo":
>>>>
>>>> SELECT
>>>>
>>>>   split(cast(value as string), ',')[0] as id,
>>>>
>>>>   split(cast(value as string), ',')[1] as name
>>>>
>>>> FROM foo;
>>>>
>>>>
>>>>

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
plit(cast(value as string), ',')[1] as name
>>>
>>> FROM foo;
>>>
>>>
>>> Assuming you were providing the following messages to Kafka:
>>>
>>> 1,aakash
>>>
>>> 2,tathagata
>>>
>>> 3,chris
>>>
>>>
>>> You could make the query plan less repetitive. I don't believe Spark
>>> offers from_csv out of the box as an expression (although CSV is well
>>> supported as a data source). You could implement an expression by reusing a
>>> lot of the supporting CSV classes which may result in a better user
>>> experience vs. explicitly using split and array indices, etc. In this
>>> simple example, casting the binary to a string just works because there is
>>> a common understanding of string's encoded as bytes between Spark and Kafka
>>> by default.
>>>
>>>
>>> -Chris
>>> --
>>> *From:* Aakash Basu <aakash.spark@gmail.com>
>>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>>> *To:* Bowden, Chris
>>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>>
>>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hey Chris,
>>>
>>> You got it right. I'm reading a *csv *file from local as mentioned
>>> above, with a console producer on Kafka side.
>>>
>>> So, as it is a csv data with headers, shall I then use from_csv on the
>>> spark side and provide a StructType to shape it up with a schema and then
>>> cast it to string as TD suggested?
>>>
>>> I'm getting all of your points at a very high level. A little more
>>> granularity would help.
>>>
>>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>>> is casting the value as string. Logically, the value shall consist of all
>>> the entire data set, so, suppose, I've a table with many columns, *how
>>> can I provide a single alias as he did in the groupBy. I missed it there
>>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>>> do it directly in a select query? The last one, if the steps are followed,
>>> can I then run a SQL query on top of the columns separately?*
>>>
>>> Thanks,
>>> Aakash.
>>>
>>>
>>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>>> wrote:
>>>
>>> You need to tell Spark about the structure of the data, it doesn't know
>>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>>> message format. If the messages are in json, Spark provides from_json out
>>> of the box. For a very simple POC you can happily cast the value to a
>>> string, etc. if you are prototyping and pushing messages by hand with a
>>> console producer on the kafka side.
>>>
>>> 
>>> From: Aakash Basu <aakash.spark@gmail.com>
>>> Sent: Thursday, March 15, 2018 7:52:28 AM
>>> To: Tathagata Das
>>> Cc: Dylan Guedes; Georg Heiler; user
>>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>>
>>> Hi,
>>>
>>> And if I run this below piece of code -
>>>
>>>
>>> from pyspark.sql import SparkSession
>>> import time
>>>
>>> class test:
>>>
>>>
>>> spark = SparkSession.builder \
>>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>>> .getOrCreate()
>>> # ssc = StreamingContext(spark, 20)
>>>
>>> table1_stream = 
>>> (spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>> "localhost:9092").option("subscribe", "test1").load())
>>>
>>> table2_stream = (
>>> spark.readStream.format("kafka").option("startingOffsets",
>>> "earliest").option("kafka.bootstrap.servers",
>>>
>>>   "localhost:9092").option("subscribe",
>>>
>>>"test2").load())
>>>
>>> joined_Stream = table1_stream.join(table2_stream, "Id")
>>> #
>>> # joined_Stream.show()
>>>
>>> # query =
>>> table1_stream.writeStream.format("console&qu

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread sagar grover
With regards,
Sagar Grover
Phone - 7022175584

On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Awesome, thanks for detailing!
>
> Was thinking the same, we've to split by comma for csv while casting
> inside.
>
> Cool! Shall try it and revert back tomm.
>
> Thanks a ton!
>
> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
>> To remain generic, the KafkaSource can only offer the lowest common
>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>> timestampType). As such, you can't just feed it a StructType. When you are
>> using a producer or consumer directly with Kafka, serialization and
>> deserialization is often an orthogonal and implicit transform. However, in
>> Spark, serialization and deserialization is an explicit transform (e.g.,
>> you define it in your query plan).
>>
>>
>> To make this more granular, if we imagine your source is registered as a
>> temp view named "foo":
>>
>> SELECT
>>
>>   split(cast(value as string), ',')[0] as id,
>>
>>   split(cast(value as string), ',')[1] as name
>>
>> FROM foo;
>>
>>
>> Assuming you were providing the following messages to Kafka:
>>
>> 1,aakash
>>
>> 2,tathagata
>>
>> 3,chris
>>
>>
>> You could make the query plan less repetitive. I don't believe Spark
>> offers from_csv out of the box as an expression (although CSV is well
>> supported as a data source). You could implement an expression by reusing a
>> lot of the supporting CSV classes which may result in a better user
>> experience vs. explicitly using split and array indices, etc. In this
>> simple example, casting the binary to a string just works because there is
>> a common understanding of string's encoded as bytes between Spark and Kafka
>> by default.
>>
>>
>> -Chris
>> --
>> *From:* Aakash Basu <aakash.spark@gmail.com>
>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>> *To:* Bowden, Chris
>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>
>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hey Chris,
>>
>> You got it right. I'm reading a *csv *file from local as mentioned
>> above, with a console producer on Kafka side.
>>
>> So, as it is a csv data with headers, shall I then use from_csv on the
>> spark side and provide a StructType to shape it up with a schema and then
>> cast it to string as TD suggested?
>>
>> I'm getting all of your points at a very high level. A little more
>> granularity would help.
>>
>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>> is casting the value as string. Logically, the value shall consist of all
>> the entire data set, so, suppose, I've a table with many columns, *how
>> can I provide a single alias as he did in the groupBy. I missed it there
>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>> do it directly in a select query? The last one, if the steps are followed,
>> can I then run a SQL query on top of the columns separately?*
>>
>> Thanks,
>> Aakash.
>>
>>
>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
>> wrote:
>>
>> You need to tell Spark about the structure of the data, it doesn't know
>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>> message format. If the messages are in json, Spark provides from_json out
>> of the box. For a very simple POC you can happily cast the value to a
>> string, etc. if you are prototyping and pushing messages by hand with a
>> console producer on the kafka side.
>>
>> 
>> From: Aakash Basu <aakash.spark@gmail.com>
>> Sent: Thursday, March 15, 2018 7:52:28 AM
>> To: Tathagata Das
>> Cc: Dylan Guedes; Georg Heiler; user
>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hi,
>>
>> And if I run this below piece of code -
>>
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>> .getOrCreate()
>> # ssc = StreamingContext(spark, 20)
>>
>> table1_stream = 
>> (spark.readStream.format("kafka").option("

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Awesome, thanks for detailing!

Was thinking the same, we've to split by comma for csv while casting inside.

Cool! Shall try it and revert back tomm.

Thanks a ton!

On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
wrote:

> To remain generic, the KafkaSource can only offer the lowest common
> denominator for a schema (topic, partition, offset, key, value, timestamp,
> timestampType). As such, you can't just feed it a StructType. When you are
> using a producer or consumer directly with Kafka, serialization and
> deserialization is often an orthogonal and implicit transform. However, in
> Spark, serialization and deserialization is an explicit transform (e.g.,
> you define it in your query plan).
>
>
> To make this more granular, if we imagine your source is registered as a
> temp view named "foo":
>
> SELECT
>
>   split(cast(value as string), ',')[0] as id,
>
>   split(cast(value as string), ',')[1] as name
>
> FROM foo;
>
>
> Assuming you were providing the following messages to Kafka:
>
> 1,aakash
>
> 2,tathagata
>
> 3,chris
>
>
> You could make the query plan less repetitive. I don't believe Spark
> offers from_csv out of the box as an expression (although CSV is well
> supported as a data source). You could implement an expression by reusing a
> lot of the supporting CSV classes which may result in a better user
> experience vs. explicitly using split and array indices, etc. In this
> simple example, casting the binary to a string just works because there is
> a common understanding of string's encoded as bytes between Spark and Kafka
> by default.
>
>
> -Chris
> --
> *From:* Aakash Basu <aakash.spark@gmail.com>
> *Sent:* Thursday, March 15, 2018 10:48:45 AM
> *To:* Bowden, Chris
> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hey Chris,
>
> You got it right. I'm reading a *csv *file from local as mentioned above,
> with a console producer on Kafka side.
>
> So, as it is a csv data with headers, shall I then use from_csv on the
> spark side and provide a StructType to shape it up with a schema and then
> cast it to string as TD suggested?
>
> I'm getting all of your points at a very high level. A little more
> granularity would help.
>
> *In the slide TD just shared*, PFA, I'm confused at the point where he is
> casting the value as string. Logically, the value shall consist of all the
> entire data set, so, suppose, I've a table with many columns, *how can I
> provide a single alias as he did in the groupBy. I missed it there itself.
> Another question is, do I have to cast in groupBy itself? Can't I do it
> directly in a select query? The last one, if the steps are followed, can I
> then run a SQL query on top of the columns separately?*
>
> Thanks,
> Aakash.
>
>
> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu <aakash.spark@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # jo

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hey Chris,

You got it right. I'm reading a *csv *file from local as mentioned above,
with a console producer on Kafka side.

So, as it is a csv data with headers, shall I then use from_csv on the
spark side and provide a StructType to shape it up with a schema and then
cast it to string as TD suggested?

I'm getting all of your points at a very high level. A little more
granularity would help.

*In the slide TD just shared*, PFA, I'm confused at the point where he is
casting the value as string. Logically, the value shall consist of all the
entire data set, so, suppose, I've a table with many columns, *how can I
provide a single alias as he did in the groupBy. I missed it there itself.
Another question is, do I have to cast in groupBy itself? Can't I do it
directly in a select query? The last one, if the steps are followed, can I
then run a SQL query on top of the columns separately?*

Thanks,
Aakash.


On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com> wrote:

You need to tell Spark about the structure of the data, it doesn't know
ahead of time if you put avro, json, protobuf, etc. in kafka for the
message format. If the messages are in json, Spark provides from_json out
of the box. For a very simple POC you can happily cast the value to a
string, etc. if you are prototyping and pushing messages by hand with a
console producer on the kafka side.


From: Aakash Basu <aakash.spark@gmail.com>
Sent: Thursday, March 15, 2018 7:52:28 AM
To: Tathagata Das
Cc: Dylan Guedes; Georg Heiler; user
Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query

Hi,

And if I run this below piece of code -


from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"test1").load())

table2_stream = (
spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

  "localhost:9092").option("subscribe",

   "test2").load())

joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

# query =
table1_stream.writeStream.format("console").start().awaitTermination()
# .queryName("table_A").format("memory")
# spark.sql("select * from table_A").show()
time.sleep(10)  # sleep 20 seconds
# query.stop()
# query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in Spark 2.3.0) -

Traceback (most recent call last):
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in 
class test:
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
joined_Stream = table1_stream.join(table2_stream, "Id")
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be resolved
on the left side of the join. The left-side columns: [key, value, topic,
partition, offset, timestamp, timestampType];'

Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark@gmail.com<
mailto:aakash.spark@gmail.com>> wrote:
Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark@gmail.com<
mailto:aakash.spark@gmail.com>> wrote:
Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm in

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
Chris identified the problem correctly. You need to parse out the json text
from Kafka into separate columns before you can join them up.
I walk through an example of this in my slides -
https://www.slideshare.net/databricks/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-with-tathagata-das


On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris <chris.bow...@microfocus.com>
wrote:

> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu <aakash.spark@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe",
> "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> # query =
> table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
> # spark.sql("select * from table_A").show()
> time.sleep(10)  # sleep 20 seconds
> # query.stop()
> # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 4, in 
> class test:
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 19, in test
> joined_Stream = table1_stream.join(table2_stream, "Id")
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
> resolved on the left side of the join. The left-side columns: [key, value,
> topic, partition, offset, timestamp, timestampType];'
>
> Seems, as per the documentation, they key and value are deserialized as
> byte arrays.
>
> I am badly stuck at this step, not many materials online, with steps to
> proceed on this, too.
>
> Any help, guys?
>
> Thanks,
> Aakash.
>
>
> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark@gmail.com<
> mailto:aakash.spark@gmail.com>> wrote:
> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
&g

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hi,

And if I run this below piece of code -

from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

table2_stream = (
spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

   "localhost:9092").option("subscribe",

"test2").load())

joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

# query =
table1_stream.writeStream.format("console").start().awaitTermination()
 # .queryName("table_A").format("memory")
# spark.sql("select * from table_A").show()
time.sleep(10)  # sleep 20 seconds
# query.stop()
# query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in *Spark 2.3.0*) -

Traceback (most recent call last):
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in 
class test:
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
joined_Stream = table1_stream.join(table2_stream, "Id")
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 931, in join
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
line 1160, in __call__
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 69, in deco


*pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
resolved on the left side of the join. The left-side columns: [key, value,
topic, partition, offset, timestamp, timestampType];'*
Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu 
wrote:

> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I progressed a bit in the above mentioned topic -
>>
>> 1) I am feeding a CSV file into the Kafka topic.
>> 2) Feeding the Kafka topic as readStream as TD's article suggests.
>> 3) Then, simply trying to do a show on the streaming dataframe, using
>> queryName('XYZ') in the writeStream and writing a sql query on top of it,
>> but that doesn't show anything.
>> 4) Once all the above problems are resolved, I want to perform a
>> stream-stream join.
>>
>> The CSV file I'm ingesting into Kafka has -
>>
>> id,first_name,last_name
>> 1,Kellyann,Moyne
>> 2,Morty,Blacker
>> 3,Tobit,Robardley
>> 4,Wilona,Kells
>> 5,Reggy,Comizzoli
>>
>>
>> My test code -
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>> .getOrCreate()
>> # ssc = StreamingContext(spark, 20)
>>
>> table1_stream = 
>> (spark.readStream.format("kafka").option("startingOffsets", 
>> "earliest").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test1").load())
>>
>> # table2_stream = 
>> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test2").load())
>>
>> # joined_Stream = table1_stream.join(table2_stream, "Id")
>> #
>> # joined_Stream.show()
>>
>> query = 
>> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
>> .format("memory")
>> # spark.sql("select * from table_A").show()
>> # time.sleep(10)  # sleep 20 seconds
>> # query.stop()
>> query.awaitTermination()
>>
>>
>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 
>> Stream_Stream_Join.py
>>
>>
>> The output I'm getting (whereas I simply want to show() my dataframe) -
>>
>> +++-+-+--+--
>> --+-+
>> | key|   value|topic|partition|offset|
>> timestamp|timestampType|
>> +++-+-+--+--
>> --+-+
>> |null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
>> 15:48:...|0|
>> |null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
>> 15:48:...|0|
>> |null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
>> 15:48:...|0|
>> |null|[33 2C 54 6F 62 

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu 
wrote:

> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
> 1,Kellyann,Moyne
> 2,Morty,Blacker
> 3,Tobit,Robardley
> 4,Wilona,Kells
> 5,Reggy,Comizzoli
>
>
> My test code -
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
> # table2_stream = 
> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
> # joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> query = 
> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
> .format("memory")
> # spark.sql("select * from table_A").show()
> # time.sleep(10)  # sleep 20 seconds
> # query.stop()
> query.awaitTermination()
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Stream_Stream_Join.py
>
>
> The output I'm getting (whereas I simply want to show() my dataframe) -
>
> +++-+-+--+--
> --+-+
> | key|   value|topic|partition|offset|
> timestamp|timestampType|
> +++-+-+--+--
> --+-+
> |null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
> 15:48:...|0|
> |null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
> 15:48:...|0|
> |null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
> 15:48:...|0|
> |null|[33 2C 54 6F 62 6...|test1|0|  5229|2018-03-15
> 15:48:...|0|
> |null|[34 2C 57 69 6C 6...|test1|0|  5230|2018-03-15
> 15:48:...|0|
> |null|[35 2C 52 65 67 6...|test1|0|  5231|2018-03-15
> 15:48:...|0|
> +++-+-+--+--
> --+-+
>
> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>   "name" : "table_A",
>   "timestamp" : "2018-03-15T10:18:07.218Z",
>   "numInputRows" : 6,
>   "inputRowsPerSecond" : 461.53846153846155,
>   "processedRowsPerSecond" : 14.634146341463415,
>   "durationMs" : {
> "addBatch" : 241,
> "getBatch" : 15,
> "getOffset" : 2,
> "queryPlanning" : 2,
> "triggerExecution" : 410,
> "walCommit" : 135
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaSource[Subscribe[test1]]",
> "startOffset" : {
>   "test1" : {
> "0" : 5226
>   }
> },
> "endOffset" : {
>   "test1" : {
> "0" : 5232
>   }
> },
> "numInputRows" : 6,
> "inputRowsPerSecond" : 461.53846153846155,
> "processedRowsPerSecond" : 14.634146341463415
>   } ],
>   "sink" : {
> "description" : "org.apache.spark.sql.execution.streaming.
> ConsoleSink@3dfc7990"
>   }
> }
>
> P.S - If I add the below piece in the code, it doesn't print a DF of the
> actual table.
>
> spark.sql("select * from table_A").show()
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>
> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu 
> wrote:
>
>> Thanks to TD, the savior!
>>
>> Shall look into it.
>>
>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>
>>> This is true stream-stream join which will automatically buffer delayed
>>> data and appropriately join stuff with SQL join semantics. Please check it
>>> out :)
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes 
>>> wrote:
>>>
 I misread it, and thought that you question was if pyspark supports
 kafka lol. Sorry!

 On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
 aakash.spark@gmail.com> wrote:

> Hey Dylan,
>
> Great!
>
> Can you revert back to my 

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm ingesting into Kafka has -

id,first_name,last_name
1,Kellyann,Moyne
2,Morty,Blacker
3,Tobit,Robardley
4,Wilona,Kells
5,Reggy,Comizzoli


My test code -

from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

# table2_stream =
(spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test2").load())

# joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

query = 
table1_stream.writeStream.format("console").queryName("table_A").start()
 # .format("memory")
# spark.sql("select * from table_A").show()
# time.sleep(10)  # sleep 20 seconds
# query.stop()
query.awaitTermination()


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py


The output I'm getting (whereas I simply want to show() my dataframe) -

+++-+-+--++-+
| key|   value|topic|partition|offset|
timestamp|timestampType|
+++-+-+--++-+
|null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
15:48:...|0|
|null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
15:48:...|0|
|null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
15:48:...|0|
|null|[33 2C 54 6F 62 6...|test1|0|  5229|2018-03-15
15:48:...|0|
|null|[34 2C 57 69 6C 6...|test1|0|  5230|2018-03-15
15:48:...|0|
|null|[35 2C 52 65 67 6...|test1|0|  5231|2018-03-15
15:48:...|0|
+++-+-+--++-+

18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
  "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
  "name" : "table_A",
  "timestamp" : "2018-03-15T10:18:07.218Z",
  "numInputRows" : 6,
  "inputRowsPerSecond" : 461.53846153846155,
  "processedRowsPerSecond" : 14.634146341463415,
  "durationMs" : {
"addBatch" : 241,
"getBatch" : 15,
"getOffset" : 2,
"queryPlanning" : 2,
"triggerExecution" : 410,
"walCommit" : 135
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
  "test1" : {
"0" : 5226
  }
},
"endOffset" : {
  "test1" : {
"0" : 5232
  }
},
"numInputRows" : 6,
"inputRowsPerSecond" : 461.53846153846155,
"processedRowsPerSecond" : 14.634146341463415
  } ],
  "sink" : {
"description" :
"org.apache.spark.sql.execution.streaming.ConsoleSink@3dfc7990"
  }
}

P.S - If I add the below piece in the code, it doesn't print a DF of the
actual table.

spark.sql("select * from table_A").show()


Any help?


Thanks,
Aakash.

On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu 
wrote:

> Thanks to TD, the savior!
>
> Shall look into it.
>
> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>> -stream-stream-joins-in-apache-spark-2-3.html
>>
>> This is true stream-stream join which will automatically buffer delayed
>> data and appropriately join stuff with SQL join semantics. Please check it
>> out :)
>>
>> TD
>>
>>
>>
>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes 
>> wrote:
>>
>>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu >> > wrote:
>>>
 Hey Dylan,

 Great!

 Can you revert back to my initial and also the latest mail?

 Thanks,
 Aakash.

 On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:

> Hi,
>
> I've been using the Kafka with pyspark since 2.1.
>
> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
> aakash.spark@gmail.com> wrote:
>
>> Hi,
>>
>> I'm yet to.
>>
>> Just want to know, 

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Thanks to TD, the savior!

Shall look into it.

On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das 
wrote:

> Relevant: https://databricks.com/blog/2018/03/13/
> introducing-stream-stream-joins-in-apache-spark-2-3.html
>
> This is true stream-stream join which will automatically buffer delayed
> data and appropriately join stuff with SQL join semantics. Please check it
> out :)
>
> TD
>
>
>
> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes 
> wrote:
>
>> I misread it, and thought that you question was if pyspark supports kafka
>> lol. Sorry!
>>
>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu 
>> wrote:
>>
>>> Hey Dylan,
>>>
>>> Great!
>>>
>>> Can you revert back to my initial and also the latest mail?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:
>>>
 Hi,

 I've been using the Kafka with pyspark since 2.1.

 On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
 aakash.spark@gmail.com> wrote:

> Hi,
>
> I'm yet to.
>
> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
> allows Python? I read somewhere, as of now Scala and Java are the 
> languages
> to be used.
>
> Please correct me if am wrong.
>
> Thanks,
> Aakash.
>
> On 14-Mar-2018 8:24 PM, "Georg Heiler" 
> wrote:
>
>> Did you try spark 2.3 with structured streaming? There watermarking
>> and plain sql might be really interesting for you.
>> Aakash Basu  schrieb am Mi. 14. März
>> 2018 um 14:57:
>>
>>> Hi,
>>>
>>>
>>>
>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>
>>> *Spark 2.2.1*
>>> *Kafka 1.0.1*
>>>
>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>> Spark, which is acting as a receiver is printing the flattened words, 
>>> which
>>> is a complete RDD operation.
>>>
>>> *My motive is to read two tables continuously (being updated) as two
>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>> based on a key and produce the output. *(I am from Spark-SQL
>>> background, pardon my Spark-SQL-ish writing)
>>>
>>> *It may happen, the first topic is receiving new data 15 mins prior
>>> to the second topic, in that scenario, how to proceed? I should not lose
>>> any data.*
>>>
>>> As of now, I want to simply pass paragraphs, read them as RDD,
>>> convert to DF and then join to get the common keys as the output. (Just 
>>> for
>>> R).
>>>
>>> Started using Spark Streaming and Kafka today itself.
>>>
>>> Please help!
>>>
>>> Thanks,
>>> Aakash.
>>>
>>

>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Tathagata Das
Relevant:
https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html


This is true stream-stream join which will automatically buffer delayed
data and appropriately join stuff with SQL join semantics. Please check it
out :)

TD



On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes  wrote:

> I misread it, and thought that you question was if pyspark supports kafka
> lol. Sorry!
>
> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu 
> wrote:
>
>> Hey Dylan,
>>
>> Great!
>>
>> Can you revert back to my initial and also the latest mail?
>>
>> Thanks,
>> Aakash.
>>
>> On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:
>>
>>> Hi,
>>>
>>> I've been using the Kafka with pyspark since 2.1.
>>>
>>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu >> > wrote:
>>>
 Hi,

 I'm yet to.

 Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
 allows Python? I read somewhere, as of now Scala and Java are the languages
 to be used.

 Please correct me if am wrong.

 Thanks,
 Aakash.

 On 14-Mar-2018 8:24 PM, "Georg Heiler" 
 wrote:

> Did you try spark 2.3 with structured streaming? There watermarking
> and plain sql might be really interesting for you.
> Aakash Basu  schrieb am Mi. 14. März 2018
> um 14:57:
>
>> Hi,
>>
>>
>>
>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>
>> *Spark 2.2.1*
>> *Kafka 1.0.1*
>>
>> As of now, I am feeding paragraphs in Kafka console producer and my
>> Spark, which is acting as a receiver is printing the flattened words, 
>> which
>> is a complete RDD operation.
>>
>> *My motive is to read two tables continuously (being updated) as two
>> distinct Kafka topics being read as two Spark Dataframes and join them
>> based on a key and produce the output. *(I am from Spark-SQL
>> background, pardon my Spark-SQL-ish writing)
>>
>> *It may happen, the first topic is receiving new data 15 mins prior
>> to the second topic, in that scenario, how to proceed? I should not lose
>> any data.*
>>
>> As of now, I want to simply pass paragraphs, read them as RDD,
>> convert to DF and then join to get the common keys as the output. (Just 
>> for
>> R).
>>
>> Started using Spark Streaming and Kafka today itself.
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Dylan Guedes
I misread it, and thought that you question was if pyspark supports kafka
lol. Sorry!

On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu 
wrote:

> Hey Dylan,
>
> Great!
>
> Can you revert back to my initial and also the latest mail?
>
> Thanks,
> Aakash.
>
> On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:
>
>> Hi,
>>
>> I've been using the Kafka with pyspark since 2.1.
>>
>> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm yet to.
>>>
>>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>>> allows Python? I read somewhere, as of now Scala and Java are the languages
>>> to be used.
>>>
>>> Please correct me if am wrong.
>>>
>>> Thanks,
>>> Aakash.
>>>
>>> On 14-Mar-2018 8:24 PM, "Georg Heiler" 
>>> wrote:
>>>
 Did you try spark 2.3 with structured streaming? There watermarking and
 plain sql might be really interesting for you.
 Aakash Basu  schrieb am Mi. 14. März 2018
 um 14:57:

> Hi,
>
>
>
> *Info (Using):Spark Streaming Kafka 0.8 package*
>
> *Spark 2.2.1*
> *Kafka 1.0.1*
>
> As of now, I am feeding paragraphs in Kafka console producer and my
> Spark, which is acting as a receiver is printing the flattened words, 
> which
> is a complete RDD operation.
>
> *My motive is to read two tables continuously (being updated) as two
> distinct Kafka topics being read as two Spark Dataframes and join them
> based on a key and produce the output. *(I am from Spark-SQL
> background, pardon my Spark-SQL-ish writing)
>
> *It may happen, the first topic is receiving new data 15 mins prior to
> the second topic, in that scenario, how to proceed? I should not lose any
> data.*
>
> As of now, I want to simply pass paragraphs, read them as RDD, convert
> to DF and then join to get the common keys as the output. (Just for R).
>
> Started using Spark Streaming and Kafka today itself.
>
> Please help!
>
> Thanks,
> Aakash.
>

>>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hey Dylan,

Great!

Can you revert back to my initial and also the latest mail?

Thanks,
Aakash.

On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:

> Hi,
>
> I've been using the Kafka with pyspark since 2.1.
>
> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I'm yet to.
>>
>> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
>> allows Python? I read somewhere, as of now Scala and Java are the languages
>> to be used.
>>
>> Please correct me if am wrong.
>>
>> Thanks,
>> Aakash.
>>
>> On 14-Mar-2018 8:24 PM, "Georg Heiler"  wrote:
>>
>>> Did you try spark 2.3 with structured streaming? There watermarking and
>>> plain sql might be really interesting for you.
>>> Aakash Basu  schrieb am Mi. 14. März 2018
>>> um 14:57:
>>>
 Hi,



 *Info (Using):Spark Streaming Kafka 0.8 package*

 *Spark 2.2.1*
 *Kafka 1.0.1*

 As of now, I am feeding paragraphs in Kafka console producer and my
 Spark, which is acting as a receiver is printing the flattened words, which
 is a complete RDD operation.

 *My motive is to read two tables continuously (being updated) as two
 distinct Kafka topics being read as two Spark Dataframes and join them
 based on a key and produce the output. *(I am from Spark-SQL
 background, pardon my Spark-SQL-ish writing)

 *It may happen, the first topic is receiving new data 15 mins prior to
 the second topic, in that scenario, how to proceed? I should not lose any
 data.*

 As of now, I want to simply pass paragraphs, read them as RDD, convert
 to DF and then join to get the common keys as the output. (Just for R).

 Started using Spark Streaming and Kafka today itself.

 Please help!

 Thanks,
 Aakash.

>>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Dylan Guedes
Hi,

I've been using the Kafka with pyspark since 2.1.

On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu 
wrote:

> Hi,
>
> I'm yet to.
>
> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
> allows Python? I read somewhere, as of now Scala and Java are the languages
> to be used.
>
> Please correct me if am wrong.
>
> Thanks,
> Aakash.
>
> On 14-Mar-2018 8:24 PM, "Georg Heiler"  wrote:
>
>> Did you try spark 2.3 with structured streaming? There watermarking and
>> plain sql might be really interesting for you.
>> Aakash Basu  schrieb am Mi. 14. März 2018 um
>> 14:57:
>>
>>> Hi,
>>>
>>>
>>>
>>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>>
>>> *Spark 2.2.1*
>>> *Kafka 1.0.1*
>>>
>>> As of now, I am feeding paragraphs in Kafka console producer and my
>>> Spark, which is acting as a receiver is printing the flattened words, which
>>> is a complete RDD operation.
>>>
>>> *My motive is to read two tables continuously (being updated) as two
>>> distinct Kafka topics being read as two Spark Dataframes and join them
>>> based on a key and produce the output. *(I am from Spark-SQL
>>> background, pardon my Spark-SQL-ish writing)
>>>
>>> *It may happen, the first topic is receiving new data 15 mins prior to
>>> the second topic, in that scenario, how to proceed? I should not lose any
>>> data.*
>>>
>>> As of now, I want to simply pass paragraphs, read them as RDD, convert
>>> to DF and then join to get the common keys as the output. (Just for R).
>>>
>>> Started using Spark Streaming and Kafka today itself.
>>>
>>> Please help!
>>>
>>> Thanks,
>>> Aakash.
>>>
>>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hi,

I'm yet to.

Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package allows
Python? I read somewhere, as of now Scala and Java are the languages to be
used.

Please correct me if am wrong.

Thanks,
Aakash.

On 14-Mar-2018 8:24 PM, "Georg Heiler"  wrote:

> Did you try spark 2.3 with structured streaming? There watermarking and
> plain sql might be really interesting for you.
> Aakash Basu  schrieb am Mi. 14. März 2018 um
> 14:57:
>
>> Hi,
>>
>>
>>
>> *Info (Using):Spark Streaming Kafka 0.8 package*
>>
>> *Spark 2.2.1*
>> *Kafka 1.0.1*
>>
>> As of now, I am feeding paragraphs in Kafka console producer and my
>> Spark, which is acting as a receiver is printing the flattened words, which
>> is a complete RDD operation.
>>
>> *My motive is to read two tables continuously (being updated) as two
>> distinct Kafka topics being read as two Spark Dataframes and join them
>> based on a key and produce the output. *(I am from Spark-SQL background,
>> pardon my Spark-SQL-ish writing)
>>
>> *It may happen, the first topic is receiving new data 15 mins prior to
>> the second topic, in that scenario, how to proceed? I should not lose any
>> data.*
>>
>> As of now, I want to simply pass paragraphs, read them as RDD, convert to
>> DF and then join to get the common keys as the output. (Just for R).
>>
>> Started using Spark Streaming and Kafka today itself.
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Georg Heiler
Did you try spark 2.3 with structured streaming? There watermarking and
plain sql might be really interesting for you.
Aakash Basu  schrieb am Mi. 14. März 2018 um
14:57:

> Hi,
>
>
>
> *Info (Using):Spark Streaming Kafka 0.8 package*
>
> *Spark 2.2.1*
> *Kafka 1.0.1*
>
> As of now, I am feeding paragraphs in Kafka console producer and my Spark,
> which is acting as a receiver is printing the flattened words, which is a
> complete RDD operation.
>
> *My motive is to read two tables continuously (being updated) as two
> distinct Kafka topics being read as two Spark Dataframes and join them
> based on a key and produce the output. *(I am from Spark-SQL background,
> pardon my Spark-SQL-ish writing)
>
> *It may happen, the first topic is receiving new data 15 mins prior to the
> second topic, in that scenario, how to proceed? I should not lose any data.*
>
> As of now, I want to simply pass paragraphs, read them as RDD, convert to
> DF and then join to get the common keys as the output. (Just for R).
>
> Started using Spark Streaming and Kafka today itself.
>
> Please help!
>
> Thanks,
> Aakash.
>


Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Aakash Basu
Hi,



*Info (Using):Spark Streaming Kafka 0.8 package*

*Spark 2.2.1*
*Kafka 1.0.1*

As of now, I am feeding paragraphs in Kafka console producer and my Spark,
which is acting as a receiver is printing the flattened words, which is a
complete RDD operation.

*My motive is to read two tables continuously (being updated) as two
distinct Kafka topics being read as two Spark Dataframes and join them
based on a key and produce the output. *(I am from Spark-SQL background,
pardon my Spark-SQL-ish writing)

*It may happen, the first topic is receiving new data 15 mins prior to the
second topic, in that scenario, how to proceed? I should not lose any data.*

As of now, I want to simply pass paragraphs, read them as RDD, convert to
DF and then join to get the common keys as the output. (Just for R).

Started using Spark Streaming and Kafka today itself.

Please help!

Thanks,
Aakash.