Thanks Chris!

On Fri, Mar 16, 2018 at 10:13 PM, Bowden, Chris <chris.bow...@microfocus.com
> wrote:

> 2. You must decide. If multiple streaming queries are launched in a single
> / simple application, only you can dictate if a single failure should cause
> the application to exit. If you use spark.streams.awaitAnyTermination be
> aware it returns / throws if _any_ streaming query terminates. A more
> complex application may keep track of many streaming queries and attempt to
> relaunch them with lower latency for certain types of failures.
>
>
> 3a. I'm not very familiar with py, but I doubt you need the sleep
>
> 3b. Kafka consumer tuning is simply a matter of passing appropriate config
> keys to the source's options if desired
>
> 3c. I would argue the most obvious improvement would be a more structured
> and compact data format if CSV isn't required.
>
> ------------------------------
> *From:* Aakash Basu <aakash.spark....@gmail.com>
> *Sent:* Friday, March 16, 2018 9:12:39 AM
> *To:* sagar grover
> *Cc:* Bowden, Chris; Tathagata Das; Dylan Guedes; Georg Heiler; user;
> jagrati.go...@myntra.com
>
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi all,
>
> From the last mail queries in the bottom, query 1's doubt has been
> resolved, I was already guessing so, that I resent same columns from Kafka
> producer multiple times, hence the join gave duplicates.
>
> Retested with fresh Kafka feed and problem was solved.
>
> But, the other queries still persists, would anyone like to reply? :)
>
> Thanks,
> Aakash.
>
> On 16-Mar-2018 3:57 PM, "Aakash Basu" <aakash.spark....@gmail.com> wrote:
>
> Hi all,
>
> The code was perfectly alright, just the package I was submitting had to
> be the updated one (marked green below). The join happened but the output
> has many duplicates (even though the *how *parameter is by default *inner*)
> -
>
> Spark Submit:
>
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py
>
>
>
> Code:
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>         .getOrCreate()
>
>     table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
>     table2_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
>
>     query1 = table1_stream.select('value')\
>         .withColumn('value', table1_stream.value.cast("string")) \
>         .withColumn("ID", split(col("value"), ",").getItem(0)) \
>         .withColumn("First_Name", split(col("value"), ",").getItem(1)) \
>         .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \
>         .drop('value')
>
>     query2 = table2_stream.select('value') \
>         .withColumn('value', table2_stream.value.cast("string")) \
>         .withColumn("ID", split(col("value"), ",").getItem(0)) \
>         .withColumn("Department", split(col("value"), ",").getItem(1)) \
>         .withColumn("Date_joined", split(col("value"), ",").getItem(2)) \
>         .drop('value')
>
>     joined_Stream = query1.join(query2, "Id")
>
>     a = query1.writeStream.format("console").start()
>     b = query2.writeStream.format("console").start()
>     c = joined_Stream.writeStream.format("console").start()
>
>     time.sleep(10)
>
>     a.awaitTermination()
>     b.awaitTermination()
>     c.awaitTermination()
>
>
> Output -
>
> +---+----------+---------+---------------+-----------+
> | ID|First_Name|Last_Name|     Department|Date_joined|
> +---+----------+---------+---------------+-----------+
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  3|     Tobit|Robardley|     Accounting|   8/3/2006|
> |  5|     Reggy|Comizzoli|Human Resources|  8/15/2012|
> |  5|     Reggy|Comizzoli|Human Resources|  8/15/2012|
> +---+----------+---------+---------------+-----------+
> only showing top 20 rows
>
>
>
>
> *Queries: *
>
> *1) Why even after inner join, the join is doing a outer type? *
>
> *2) Do we need to put awaitTermination on all the streams? Or putting only
> on the input streams would suffice? *
> *3) This code is not optimized, how to generically optimize streaming
> code?*
>
> Thanks,
> Aakash.
>
> On Fri, Mar 16, 2018 at 3:23 PM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
> Hi,
>
> *Thanks to Chris and TD* for perpetually supporting my endeavor. I ran
> the code with a little bit of tweak here and there, *it worked well in
> Spark 2.2.1* giving me the Deserialized values (I used withColumn in the
> writeStream section to run all SQL functions of split and cast).
>
> But, when I submit the same code in 2.3.0, I get an error which I couldn't
> find any solution of, on the internet.
>
>
>
>
>
> *Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming
> Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId =
> cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets:
> {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread
> State: RUNNABLE' *
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Final code (for clearer understanding of where it may go wrong in 2.3.0)
> - from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
> import split, colclass test: spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
> table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load()) query =
> table1_stream.select('value').withColumn('value',
> table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"),
> ",").getItem(0)) \ .withColumn("First_Name", split(col("value"),
> ",").getItem(1)) \ .withColumn("Last_Name", split(col("value"),
> ",").getItem(2)) \ .drop('value').writeStream.format("console").start()
> time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1#
> /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py#
> Code not working in Spark 2.3.0#
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py
> *
> 2) I'm getting the below output as expected, from the above code in 2.2.1.
> My query is, is there a way to get the header of a file being read and
> ensure header=True? (Or is it that for Structured Streaming, user has to
> provide headers explicitly all the time, as data shall always come in this
> structure [for Kafka] - topic, partition, offset, key, value, timestamp,
> timestampType; if so, then how to remove column headers explicitly from the
> data, as in the below table) I know it is a stream, and the data is fed in
> as messages, but still wanted experts to put some more light into it.
>
> +---+----------+---------+
> | ID|First_Name|Last_Name|
> +---+----------+---------+
> | Hi|      null|     null|
> | id|first_name|last_name|
> |  1|  Kellyann|    Moyne|
> |  2|     Morty|  Blacker|
> |  3|     Tobit|Robardley|
> |  4|    Wilona|    Kells|
> |  5|     Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|    Moyne|
> |  2|     Morty|  Blacker|
> |  3|     Tobit|Robardley|
> |  4|    Wilona|    Kells|
> |  5|     Reggy|Comizzoli|
> | id|first_name|last_name|
> |  1|  Kellyann|    Moyne|
> |  2|     Morty|  Blacker|
> |  3|     Tobit|Robardley|
> |  4|    Wilona|    Kells|
> |  5|     Reggy|Comizzoli|
> | id|first_name|last_name|
> +---+----------+---------+
> only showing top 20 rows
>
>
> Any help?
>
> Thanks,
> Aakash.
>
> On Fri, Mar 16, 2018 at 12:54 PM, sagar grover <sagargrove...@gmail.com>
> wrote:
>
>
> With regards,
> Sagar Grover
> Phone - 7022175584
>
> On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
> Awesome, thanks for detailing!
>
> Was thinking the same, we've to split by comma for csv while casting
> inside.
>
> Cool! Shall try it and revert back tomm.
>
> Thanks a ton!
>
> On 15-Mar-2018 11:50 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
> To remain generic, the KafkaSource can only offer the lowest common
> denominator for a schema (topic, partition, offset, key, value, timestamp,
> timestampType). As such, you can't just feed it a StructType. When you are
> using a producer or consumer directly with Kafka, serialization and
> deserialization is often an orthogonal and implicit transform. However, in
> Spark, serialization and deserialization is an explicit transform (e.g.,
> you define it in your query plan).
>
>
> To make this more granular, if we imagine your source is registered as a
> temp view named "foo":
>
> SELECT
>
>   split(cast(value as string), ',')[0] as id,
>
>   split(cast(value as string), ',')[1] as name
>
> FROM foo;
>
>
> Assuming you were providing the following messages to Kafka:
>
> 1,aakash
>
> 2,tathagata
>
> 3,chris
>
>
> You could make the query plan less repetitive. I don't believe Spark
> offers from_csv out of the box as an expression (although CSV is well
> supported as a data source). You could implement an expression by reusing a
> lot of the supporting CSV classes which may result in a better user
> experience vs. explicitly using split and array indices, etc. In this
> simple example, casting the binary to a string just works because there is
> a common understanding of string's encoded as bytes between Spark and Kafka
> by default.
>
>
> -Chris
> ------------------------------
> *From:* Aakash Basu <aakash.spark....@gmail.com>
> *Sent:* Thursday, March 15, 2018 10:48:45 AM
> *To:* Bowden, Chris
> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hey Chris,
>
> You got it right. I'm reading a *csv *file from local as mentioned above,
> with a console producer on Kafka side.
>
> So, as it is a csv data with headers, shall I then use from_csv on the
> spark side and provide a StructType to shape it up with a schema and then
> cast it to string as TD suggested?
>
> I'm getting all of your points at a very high level. A little more
> granularity would help.
>
> *In the slide TD just shared*, PFA, I'm confused at the point where he is
> casting the value as string. Logically, the value shall consist of all the
> entire data set, so, suppose, I've a table with many columns, *how can I
> provide a single alias as he did in the groupBy. I missed it there itself.
> Another question is, do I have to cast in groupBy itself? Can't I do it
> directly in a select query? The last one, if the steps are followed, can I
> then run a SQL query on top of the columns separately?*
>
> Thanks,
> Aakash.
>
>
> On 15-Mar-2018 9:07 PM, "Bowden, Chris" <chris.bow...@microfocus.com>
> wrote:
>
> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> ________________________________________
> From: Aakash Basu <aakash.spark....@gmail.com>
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>         .getOrCreate()
>     # ssc = StreamingContext(spark, 20)
>
>     table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load())
>
>     table2_stream = (
>     spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
>         "localhost:9092").option("subscribe",
>
>                                  "test2").load())
>
>     joined_Stream = table1_stream.join(table2_stream, "Id")
>     #
>     # joined_Stream.show()
>
>     # query =
>     table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
>     # spark.sql("select * from table_A").show()
>     time.sleep(10)  # sleep 20 seconds
>     # query.stop()
>     # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File 
> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
> line 4, in <module>
>     class test:
>   File 
> "/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
> line 19, in test
>     joined_Stream = table1_stream.join(table2_stream, "Id")
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
> pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
> py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/
> pyspark.zip/pyspark/sql/utils.py", line 69, in deco
> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
> resolved on the left side of the join. The left-side columns: [key, value,
> topic, partition, offset, timestamp, timestampType];'
>
> Seems, as per the documentation, they key and value are deserialized as
> byte arrays.
>
> I am badly stuck at this step, not many materials online, with steps to
> proceed on this, too.
>
> Any help, guys?
>
> Thanks,
> Aakash.
>
>
> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu <aakash.spark....@gmail.com
> <mailto:aakash.spark....@gmail.com>> wrote:
> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu <aakash.spark....@gmail.com
> <mailto:aakash.spark....@gmail.com>> wrote:
> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
> 1,Kellyann,Moyne
> 2,Morty,Blacker
> 3,Tobit,Robardley
> 4,Wilona,Kells
> 5,Reggy,Comizzoli
>
>
> My test code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("DirectKafka_Spark_Stream_Stream_Join") \
>         .getOrCreate()
>     # ssc = StreamingContext(spark, 20)
>
>     table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load())
>
>     # table2_stream = (spark.readStream.format("kafka").option("
> kafka.bootstrap.servers", "localhost:9092").option("subscribe",
> "test2").load())
>
>     # joined_Stream = table1_stream.join(table2_stream, "Id")
>     #
>     # joined_Stream.show()
>
>     query = 
> table1_stream.writeStream.format("console").queryName("table_A").start()
> # .format("memory")
>     # spark.sql("select * from table_A").show()
>     # time.sleep(10)  # sleep 20 seconds
>     # query.stop()
>     query.awaitTermination()
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
> The output I'm getting (whereas I simply want to show() my dataframe) -
>
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
> | key|               value|topic|partition|offset|
>  timestamp|timestampType|
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
> |null|[69 64 2C 66 69 7...|test1|        0|  5226|2018-03-15 15:48:...|
>         0|
> |null|[31 2C 4B 65 6C 6...|test1|        0|  5227|2018-03-15 15:48:...|
>         0|
> |null|[32 2C 4D 6F 72 7...|test1|        0|  5228|2018-03-15 15:48:...|
>         0|
> |null|[33 2C 54 6F 62 6...|test1|        0|  5229|2018-03-15 15:48:...|
>         0|
> |null|[34 2C 57 69 6C 6...|test1|        0|  5230|2018-03-15 15:48:...|
>         0|
> |null|[35 2C 52 65 67 6...|test1|        0|  5231|2018-03-15 15:48:...|
>         0|
> +----+--------------------+-----+---------+------+----------
> ----------+-------------+
>
> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>   "name" : "table_A",
>   "timestamp" : "2018-03-15T10:18:07.218Z",
>   "numInputRows" : 6,
>   "inputRowsPerSecond" : 461.53846153846155,
>   "processedRowsPerSecond" : 14.634146341463415,
>   "durationMs" : {
>     "addBatch" : 241,
>     "getBatch" : 15,
>     "getOffset" : 2,
>     "queryPlanning" : 2,
>     "triggerExecution" : 410,
>     "walCommit" : 135
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaSource[Subscribe[test1]]",
>     "startOffset" : {
>       "test1" : {
>         "0" : 5226
>       }
>     },
>     "endOffset" : {
>       "test1" : {
>         "0" : 5232
>       }
>     },
>     "numInputRows" : 6,
>     "inputRowsPerSecond" : 461.53846153846155,
>     "processedRowsPerSecond" : 14.634146341463415
>   } ],
>   "sink" : {
>     "description" : "org.apache.spark.sql.executio
> n.streaming.ConsoleSink@3dfc7990"
>   }
> }
>
> P.S - If I add the below piece in the code, it doesn't print a DF of the
> actual table.
>
> spark.sql("select * from table_A").show()
>
> Any help?
>
>
> Thanks,
> Aakash.
>
> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com
> <mailto:aakash.spark....@gmail.com>> wrote:
> Thanks to TD, the savior!
>
> Shall look into it.
>
> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
> tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> wrote:
> Relevant: https://databricks.com/blog/2018/03/13/introducing-stream-st
> ream-joins-in-apache-spark-2-3.html
>
> This is true stream-stream join which will automatically buffer delayed
> data and appropriately join stuff with SQL join semantics. Please check it
> out :)
>
> TD
>
>
>
> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes <djmggue...@gmail.com
> <mailto:djmggue...@gmail.com>> wrote:
> I misread it, and thought that you question was if pyspark supports kafka
> lol. Sorry!
>
> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <aakash.spark....@gmail.com
> <mailto:aakash.spark....@gmail.com>> wrote:
> Hey Dylan,
>
> Great!
>
> Can you revert back to my initial and also the latest mail?
>
> Thanks,
> Aakash.
>
> On 15-Mar-2018 12:27 AM, "Dylan Guedes" <djmggue...@gmail.com<mailto:d
> jmggue...@gmail.com>> wrote:
> Hi,
>
> I've been using the Kafka with pyspark since 2.1.
>
> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <aakash.spark....@gmail.com
> <mailto:aakash.spark....@gmail.com>> wrote:
> Hi,
>
> I'm yet to.
>
> Just want to know, when does Spark 2.3 with 0.10 Kafka Spark Package
> allows Python? I read somewhere, as of now Scala and Java are the languages
> to be used.
>
> Please correct me if am wrong.
>
> Thanks,
> Aakash.
>
> On 14-Mar-2018 8:24 PM, "Georg Heiler" <georg.kf.hei...@gmail.com<mailto:
> georg.kf.hei...@gmail.com>> wrote:
> Did you try spark 2.3 with structured streaming? There watermarking and
> plain sql might be really interesting for you.
> Aakash Basu <aakash.spark....@gmail.com<mailto:aakash.spark....@gmail.com>>
> schrieb am Mi. 14. März 2018 um 14:57:
> Hi,
>
> Info (Using):
> Spark Streaming Kafka 0.8 package
> Spark 2.2.1
> Kafka 1.0.1
>
> As of now, I am feeding paragraphs in Kafka console producer and my Spark,
> which is acting as a receiver is printing the flattened words, which is a
> complete RDD operation.
>
> My motive is to read two tables continuously (being updated) as two
> distinct Kafka topics being read as two Spark Dataframes and join them
> based on a key and produce the output. (I am from Spark-SQL background,
> pardon my Spark-SQL-ish writing)
>
> It may happen, the first topic is receiving new data 15 mins prior to the
> second topic, in that scenario, how to proceed? I should not lose any data.
>
> As of now, I want to simply pass paragraphs, read them as RDD, convert to
> DF and then join to get the common keys as the output. (Just for R&D).
>
> Started using Spark Streaming and Kafka today itself.
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to