I am not seeing what else I will be increasing in this case if I were to write a set of rows to two different topics? which means for every row there will be two I/O's to two different topics
if forEachBatch caches great! but again I would try and cache as little as possible. If I duplicate rows using explode or something I will be caching 2X. On Tue, Jul 24, 2018 at 11:52 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Any way you go you’ll increase something... ☺ > > > > Even with foreachBatch you would have to cache the DataFrame before > submitting each batch to each topic to avoid recomputing it (see > https://docs.databricks.com/spark/latest/structured- > streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch > ) > > > > Nothing’s free! 😉 > > > > Since you’re just pushing all messages to kafka, might be easier on you to > just explode the rows and let Spark do the rest for you. > > > > *From: *kant kodali <kanth...@gmail.com> > *Date: *Tuesday, July 24, 2018 at 1:04 PM > *To: *Silvio Fiorito <silvio.fior...@granturing.com> > *Cc: *Arun Mahadevan <ar...@apache.org>, chandan prakash < > chandanbaran...@gmail.com>, Tathagata Das <tathagata.das1...@gmail.com>, " > ymaha...@snappydata.io" <ymaha...@snappydata.io>, "priy...@asperasoft.com" > <priy...@asperasoft.com>, "user @spark" <user@spark.apache.org> > > *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries > > > > @Silvio Thought about duplicating rows but dropped the idea for increasing > memory. forEachBatch sounds Interesting! > > > > On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > > Using the current Kafka sink that supports routing based on topic column, > you could just duplicate the rows (e.g. explode rows with different topic, > key values). That way you’re only reading and processing the source once > and not having to resort to custom sinks, foreachWriter, or multiple > queries. > > > > In Spark 2.4 there will be a foreachBatch method that will give you a > DataFrame and let you write to the sink as you wish. > > > > *From: *kant kodali <kanth...@gmail.com> > *Date: *Monday, July 23, 2018 at 4:43 AM > *To: *Arun Mahadevan <ar...@apache.org> > *Cc: *chandan prakash <chandanbaran...@gmail.com>, Tathagata Das < > tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" < > ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>, > "user @spark" <user@spark.apache.org> > > > *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries > > > > understand each row has a topic column but can we write one row to > multiple topics? > > > > On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan <ar...@apache.org> wrote: > > What I meant was the number of partitions cannot be varied with > ForeachWriter v/s if you were to write to each sink using independent > queries. Maybe this is obvious. > > > > I am not sure about the difference you highlight about the performance > part. The commit happens once per micro batch and "close(null)" is invoked. > You can batch your writes in the process and/or in the close. The guess the > writes can still be atomic and decided by if “close” returns successfully > or throws an exception. > > > > Thanks, > > Arun > > > > *From: *chandan prakash <chandanbaran...@gmail.com> > *Date: *Thursday, July 12, 2018 at 10:37 AM > *To: *Arun Iyer <ar...@apache.org> > *Cc: *Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" > <ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>, > "user @spark" <user@spark.apache.org> > > > *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries > > > > Thanks a lot Arun for your response. > > I got your point that existing sink plugins like kafka, etc can not be > used. > > However I could not get the part : " you cannot scale the partitions for > the sinks independently " > > Can you please rephrase the above part ? > > > > Also, > > I guess : > > using foreachwriter for multiple sinks will affect the performance because > write will happen to a sink per record basis (after deciding a record > belongs to which particular sink), where as in the current implementation > all data under a RDD partition gets committed to the sink atomically in one > go. Please correct me if I am wrong here. > > > > > > > > Regards, > > Chandan > > > > On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <ar...@apache.org> wrote: > > Yes ForeachWriter [1] could be an option If you want to write to different > sinks. You can put your custom logic to split the data into different sinks. > > > > The drawback here is that you cannot plugin existing sinks like Kafka and > you need to write the custom logic yourself and you cannot scale the > partitions for the sinks independently. > > > > [1] https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ > ForeachWriter.html > > > > *From: *chandan prakash <chandanbaran...@gmail.com> > *Date: *Thursday, July 12, 2018 at 2:38 AM > *To: *Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" > <ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>, > "user @spark" <user@spark.apache.org> > *Subject: *Re: [Structured Streaming] Avoiding multiple streaming queries > > > > Hi, > > Did anyone of you thought about writing a custom foreach sink writer > which can decided which record should go to which sink (based on some > marker in record, which we can possibly annotate during transformation) and > then accordingly write to specific sink. > > This will mean that: > > 1. every custom sink writer will have connections to as many sinks as many > there are types of sink where records can go. > > 2. every record will be read once in the single query but can be written > to multiple sinks > > > > Do you guys see any drawback in this approach ? > > One drawback off course there is that sink is supposed to write the > records as they are but we are inducing some intelligence here in the sink. > > Apart from that any other issues do you see with this approach? > > > > Regards, > > Chandan > > > > > > On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > > Of course, you can write to multiple Kafka topics from a single query. If > your dataframe that you want to write has a column named "topic" (along > with "key", and "value" columns), it will write the contents of a row to > the topic in that row. This automatically works. So the only thing you need > to figure out is how to generate the value of that column. > > > > This is documented - https://spark.apache.org/docs/latest/structured- > streaming-kafka-integration.html#writing-data-to-kafka > > > > Or am i misunderstanding the problem? > > > > TD > > > > > > > > > > On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <ymaha...@snappydata.io> > wrote: > > I had a similar issue and i think that’s where the structured streaming > design lacks. > > Seems like Question#2 in your email is a viable workaround for you. > > > > In my case, I have a custom Sink backed by an efficient in-memory column > store suited for fast ingestion. > > > > I have a Kafka stream coming from one topic, and I need to classify the > stream based on schema. > > For example, a Kafka topic can have three different types of schema > messages and I would like to ingest into the three different column > tables(having different schema) using my custom Sink implementation. > > > > Right now only(?) option I have is to create three streaming queries > reading the same topic and ingesting to respective column tables using > their Sink implementations. > > These three streaming queries create underlying three > IncrementalExecutions and three KafkaSources, and three queries reading the > same data from the same Kafka topic. > > Even with CachedKafkaConsumers at partition level, this is not an > efficient way to handle a simple streaming use case. > > > > One workaround to overcome this limitation is to have same schema for all > the messages in a Kafka partition, unfortunately this is not in our control > and customers cannot change it due to their dependencies on other > subsystems. > > > > Thanks, > > http://www.snappydata.io/blog <http://snappydata.io> > > > > On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava < > priy...@asperasoft.com> wrote: > > I have a structured streaming query which sinks to Kafka. This query has > a complex aggregation logic. > > > > I would like to sink the output DF of this query to multiple Kafka topics > each partitioned on a different ‘key’ column. I don’t want to have > multiple Kafka sinks for each of the different Kafka topics because that > would mean running multiple streaming queries - one for each Kafka topic, > especially since my aggregation logic is complex. > > > > Questions: > > 1. Is there a way to output the results of a structured streaming query > to multiple Kafka topics each with a different key column but without > having to execute multiple streaming queries? > > > > 2. If not, would it be efficient to cascade the multiple queries such > that the first query does the complex aggregation and writes output > to Kafka and then the other queries just read the output of the first query > and write their topics to Kafka thus avoiding doing the complex aggregation > again? > > > > Thanks in advance for any help. > > > > Priyank > > > > > > > > > > > > > -- > > Chandan Prakash > > > > > -- > > Chandan Prakash > > > > >