I am not seeing what else I will be increasing in this case if I were to
write a set of rows to two different topics? which means for every row
there will be two I/O's to two different topics

if forEachBatch caches great! but again I would try and cache as little as
possible. If I duplicate rows using explode or something I will be caching
2X.




On Tue, Jul 24, 2018 at 11:52 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Any way you go you’ll increase something... ☺
>
>
>
> Even with foreachBatch you would have to cache the DataFrame before
> submitting each batch to each topic to avoid recomputing it (see
> https://docs.databricks.com/spark/latest/structured-
> streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch
> )
>
>
>
> Nothing’s free! 😉
>
>
>
> Since you’re just pushing all messages to kafka, might be easier on you to
> just explode the rows and let Spark do the rest for you.
>
>
>
> *From: *kant kodali <kanth...@gmail.com>
> *Date: *Tuesday, July 24, 2018 at 1:04 PM
> *To: *Silvio Fiorito <silvio.fior...@granturing.com>
> *Cc: *Arun Mahadevan <ar...@apache.org>, chandan prakash <
> chandanbaran...@gmail.com>, Tathagata Das <tathagata.das1...@gmail.com>, "
> ymaha...@snappydata.io" <ymaha...@snappydata.io>, "priy...@asperasoft.com"
> <priy...@asperasoft.com>, "user @spark" <user@spark.apache.org>
>
> *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries
>
>
>
> @Silvio Thought about duplicating rows but dropped the idea for increasing
> memory. forEachBatch sounds Interesting!
>
>
>
> On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
> Using the current Kafka sink that supports routing based on topic column,
> you could just duplicate the rows (e.g. explode rows with different topic,
> key values). That way you’re only reading and processing the source once
> and not having to resort to custom sinks, foreachWriter, or multiple
> queries.
>
>
>
> In Spark 2.4 there will be a foreachBatch method that will give you a
> DataFrame and let you write to the sink as you wish.
>
>
>
> *From: *kant kodali <kanth...@gmail.com>
> *Date: *Monday, July 23, 2018 at 4:43 AM
> *To: *Arun Mahadevan <ar...@apache.org>
> *Cc: *chandan prakash <chandanbaran...@gmail.com>, Tathagata Das <
> tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" <
> ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>,
> "user @spark" <user@spark.apache.org>
>
>
> *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries
>
>
>
> understand each row has a topic column but can we write one row to
> multiple topics?
>
>
>
> On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <ar...@apache.org> wrote:
>
> What I meant was the number of partitions cannot be varied with
> ForeachWriter v/s if you were to write to each sink using independent
> queries. Maybe this is obvious.
>
>
>
> I am not sure about the difference you highlight about the performance
> part. The commit happens once per micro batch and "close(null)" is invoked.
> You can batch your writes in the process and/or in the close. The guess the
> writes can still be atomic and decided by if “close” returns successfully
> or throws an exception.
>
>
>
> Thanks,
>
> Arun
>
>
>
> *From: *chandan prakash <chandanbaran...@gmail.com>
> *Date: *Thursday, July 12, 2018 at 10:37 AM
> *To: *Arun Iyer <ar...@apache.org>
> *Cc: *Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io"
> <ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>,
> "user @spark" <user@spark.apache.org>
>
>
> *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries
>
>
>
> Thanks a lot Arun for your response.
>
> I got your point that existing sink plugins like kafka, etc can not be
> used.
>
> However I could not get the part : " you cannot scale the partitions for
> the sinks independently "
>
> Can you please rephrase the above part ?
>
>
>
> Also,
>
> I guess :
>
> using foreachwriter for multiple sinks will affect the performance because
> write will happen to a sink per record basis (after deciding a record
> belongs to which particular sink), where as in the current implementation
> all data under a RDD partition gets committed to the sink atomically in one
> go. Please correct me if I am wrong here.
>
>
>
>
>
>
>
> Regards,
>
> Chandan
>
>
>
> On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <ar...@apache.org> wrote:
>
> Yes ForeachWriter [1] could be an option If you want to write to different
> sinks. You can put your custom logic to split the data into different sinks.
>
>
>
> The drawback here is that you cannot plugin existing sinks like Kafka and
> you need to write the custom logic yourself and you cannot scale the
> partitions for the sinks independently.
>
>
>
> [1] https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/
> ForeachWriter.html
>
>
>
> *From: *chandan prakash <chandanbaran...@gmail.com>
> *Date: *Thursday, July 12, 2018 at 2:38 AM
> *To: *Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io"
> <ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>,
> "user @spark" <user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries
>
>
>
> Hi,
>
> Did anyone of you thought  about writing a custom foreach sink writer
> which can decided which record should go to which sink (based on some
> marker in record, which we can possibly annotate during transformation) and
> then accordingly write to specific sink.
>
> This will mean that:
>
> 1. every custom sink writer will have connections to as many sinks as many
> there are types of sink where records can go.
>
> 2.  every record will be read once in the single query but can be written
> to multiple sinks
>
>
>
> Do you guys see any drawback in this approach ?
>
> One drawback off course there is that sink is supposed to write the
> records as they are but we are inducing some intelligence here in the sink.
>
> Apart from that any other issues do you see with this approach?
>
>
>
> Regards,
>
> Chandan
>
>
>
>
>
> On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
> Of course, you can write to multiple Kafka topics from a single query. If
> your dataframe that you want to write has a column named "topic" (along
> with "key", and "value" columns), it will write the contents of a row to
> the topic in that row. This automatically works. So the only thing you need
> to figure out is how to generate the value of that column.
>
>
>
> This is documented - https://spark.apache.org/docs/latest/structured-
> streaming-kafka-integration.html#writing-data-to-kafka
>
>
>
> Or am i misunderstanding the problem?
>
>
>
> TD
>
>
>
>
>
>
>
>
>
> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <ymaha...@snappydata.io>
> wrote:
>
> I had a similar issue and i think that’s where the structured streaming
> design lacks.
>
> Seems like Question#2 in your email is a viable workaround for you.
>
>
>
> In my case, I have a custom Sink backed by an efficient in-memory column
> store suited for fast ingestion.
>
>
>
> I have a Kafka stream coming from one topic, and I need to classify the
> stream based on schema.
>
> For example, a Kafka topic can have three different types of schema
> messages and I would like to ingest into the three different column
> tables(having different schema) using my custom Sink implementation.
>
>
>
> Right now only(?) option I have is to create three streaming queries
> reading the same topic and ingesting to respective column tables using
> their Sink implementations.
>
> These three streaming queries create underlying three
> IncrementalExecutions and three KafkaSources, and three queries reading the
> same data from the same Kafka topic.
>
> Even with CachedKafkaConsumers at partition level, this is not an
> efficient way to handle a simple streaming use case.
>
>
>
> One workaround to overcome this limitation is to have same schema for all
> the messages in a Kafka partition, unfortunately this is not in our control
> and customers cannot change it due to their dependencies on other
> subsystems.
>
>
>
> Thanks,
>
> http://www.snappydata.io/blog <http://snappydata.io>
>
>
>
> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
> I have a structured streaming query which sinks to Kafka.  This query has
> a complex aggregation logic.
>
>
>
> I would like to sink the output DF of this query to multiple Kafka topics
> each partitioned on a different ‘key’ column.  I don’t want to have
> multiple Kafka sinks for each of the different Kafka topics because that
> would mean running multiple streaming queries - one for each Kafka topic,
> especially since my aggregation logic is complex.
>
>
>
> Questions:
>
> 1.  Is there a way to output the results of a structured streaming query
> to multiple Kafka topics each with a different key column but without
> having to execute multiple streaming queries?
>
>
>
> 2.  If not,  would it be efficient to cascade the multiple queries such
> that the first query does the complex aggregation and writes output
> to Kafka and then the other queries just read the output of the first query
> and write their topics to Kafka thus avoiding doing the complex aggregation
> again?
>
>
>
> Thanks in advance for any help.
>
>
>
> Priyank
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Chandan Prakash
>
>
>
>
> --
>
> Chandan Prakash
>
>
>
>
>

Reply via email to