Hi Gerard,

"If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem."

I tried literally everything, nothing worked out.

1) Tried NC from two different ports for two diff streams, still nothing
worked.

2) Tried same using Kafka with awaitAnyTermination, still no use, the first
stream write kept on blocking the second... (And inner queries with
aggregation doesn't work in Spark Streaming it seems, as it expects a
separate writeStream.start()).

Any insight (or direct update to the code would be helpful).

Thanks,
Aakash.

On Mon 16 Apr, 2018, 9:05 PM Gerard Maas, <gerard.m...@gmail.com> wrote:

> Aakash,
>
> There are two issues here.
> The issue with the code on the first question is that the first query
> blocks and the code for the second does not get executed. Panagiotis
> pointed this out correctly.
> In the updated code, the issue is related to netcat (nc) and the way
> structured streaming works. As far as I remember, netcat only delivers data
> to the first network connection.
> On the structured streaming side, each query will issue its own
> connection. This results in only the first query getting the data.
> If you would talk to a TPC server supporting multiple connected clients,
> you would see data in both queries.
>
> If your actual source is Kafka, the original solution of using
> `spark.streams.awaitAnyTermination`  should solve the problem.
>
> -kr, Gerard.
>
>
>
> On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark....@gmail.com>
> wrote:
>
>> Hey Jayesh and Others,
>>
>> Is there then, any other way to come to a solution for this use-case?
>>
>> Thanks,
>> Aakash.
>>
>> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
>> jayesh.lalw...@capitalone.com> wrote:
>>
>>> Note that what you are trying to do here is join a streaming data frame
>>> with an aggregated streaming data frame. As per the documentation, joining
>>> an aggregated streaming data frame with another streaming data frame is not
>>> supported
>>>
>>>
>>>
>>>
>>>
>>> *From: *spark receiver <spark.recei...@gmail.com>
>>> *Date: *Friday, April 13, 2018 at 11:49 PM
>>> *To: *Aakash Basu <aakash.spark....@gmail.com>
>>> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
>>> user@spark.apache.org>
>>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>>
>>>
>>>
>>> Hi Panagiotis ,
>>>
>>>
>>>
>>> Wondering you solved the problem or not? Coz I met the same issue today.
>>> I’d appreciate  so much if you could paste the code snippet  if it’s
>>> working .
>>>
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark....@gmail.com> 写道:
>>>
>>>
>>>
>>> Hi Panagiotis,
>>>
>>> I did that, but it still prints the result of the first query and awaits
>>> for new data, doesn't even goes to the next one.
>>>
>>> *Data -*
>>>
>>> $ nc -lk 9998
>>>
>>> 1,2
>>> 3,4
>>> 5,6
>>> 7,8
>>>
>>> *Result -*
>>>
>>> -------------------------------------------
>>> Batch: 0
>>> -------------------------------------------
>>> +----+
>>> |aver|
>>> +----+
>>> | 3.0|
>>> +----+
>>>
>>> -------------------------------------------
>>> Batch: 1
>>> -------------------------------------------
>>> +----+
>>> |aver|
>>> +----+
>>> | 4.0|
>>> +----+
>>>
>>>
>>>
>>> *Updated Code -*
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import split
>>>
>>> spark = SparkSession \
>>>     .builder \
>>>     .appName("StructuredNetworkWordCount") \
>>>     .getOrCreate()
>>>
>>> data = spark \
>>>     .readStream \
>>>     .format("socket") \
>>>     .option("header","true") \
>>>     .option("host", "localhost") \
>>>     .option("port", 9998) \
>>>     .load("csv")
>>>
>>>
>>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>>> split(data.value, ",").getItem(1).alias("col2"))
>>>
>>> id_DF.createOrReplaceTempView("ds")
>>>
>>> df = spark.sql("select avg(col1) as aver from ds")
>>>
>>> df.createOrReplaceTempView("abcd")
>>>
>>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>>> col3 from ds")  # (select aver from abcd)
>>>
>>> query2 = df \
>>>     .writeStream \
>>>     .format("console") \
>>>     .outputMode("complete") \
>>>     .trigger(processingTime='5 seconds') \
>>>     .start()
>>>
>>> query = wordCounts \
>>>     .writeStream \
>>>     .format("console") \
>>>     .trigger(processingTime='5 seconds') \
>>>     .start()
>>>
>>> spark.streams.awaitAnyTermination()
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Aakash.
>>>
>>>
>>>
>>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>>> panga...@gmail.com> wrote:
>>>
>>> Hello Aakash,
>>>
>>>
>>>
>>> When you use query.awaitTermination you are pretty much blocking there
>>> waiting for the current query to stop or throw an exception. In your case
>>> the second query will not even start.
>>>
>>> What you could do instead is remove all the blocking calls and use
>>> spark.streams.awaitAnyTermination instead (waiting for either query1 or
>>> query2 to terminate). Make sure you do that after the query2.start call.
>>>
>>>
>>>
>>> I hope this helps.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Panagiotis
>>>
>>>
>>>
>>> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark....@gmail.com>
>>> wrote:
>>>
>>> Any help?
>>>
>>> Need urgent help. Someone please clarify the doubt?
>>>
>>>
>>>
>>> ---------- Forwarded message ----------
>>> From: *Aakash Basu* <aakash.spark....@gmail.com>
>>> Date: Thu, Apr 5, 2018 at 3:18 PM
>>> Subject: [Structured Streaming] More than 1 streaming in a code
>>> To: user <user@spark.apache.org>
>>>
>>> Hi,
>>>
>>> If I have more than one writeStream in a code, which operates on the
>>> same readStream data, why does it produce only the first writeStream? I
>>> want the second one to be also printed on the console.
>>>
>>> How to do that?
>>>
>>>
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import split, col
>>>
>>> class test:
>>>
>>>
>>>     spark = SparkSession.builder \
>>>         .appName("Stream_Col_Oper_Spark") \
>>>         .getOrCreate()
>>>
>>>     data = spark.readStream.format("kafka") \
>>>         .option("startingOffsets", "latest") \
>>>         .option("kafka.bootstrap.servers", "localhost:9092") \
>>>         .option("subscribe", "test1") \
>>>         .load()
>>>
>>>     ID = data.select('value') \
>>>         .withColumn('value', data.value.cast("string")) \
>>>         .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>>>         .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>>>         .drop('value')
>>>
>>>     ID.createOrReplaceTempView("transformed_Stream_DF")
>>>
>>>     df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>>>
>>>     df.createOrReplaceTempView("abcd")
>>>
>>>     wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>>> col3 from transformed_Stream_DF")
>>>
>>>
>>>     # -----------------------#
>>>
>>>     query1 = df \
>>>         .writeStream \
>>>         .format("console") \
>>>         .outputMode("complete") \
>>>         .trigger(processingTime='3 seconds') \
>>>         .start()
>>>
>>>     query1.awaitTermination()
>>>     # -----------------------#
>>>
>>>     query2 = wordCounts \
>>>         .writeStream \
>>>         .format("console") \
>>>         .trigger(processingTime='3 seconds') \
>>>         .start()
>>>
>>>     query2.awaitTermination()
>>>
>>>     # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
>>> --packages 
>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>>>  
>>> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Aakash.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>
>>
>

Reply via email to