Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread Tathagata Das
Are you sure the code is correct? A Dataset does not have a method
"trigger". Rather I believe the correct code should be

StreamingQuery query = resultDataSet*.writeStream.*trigger(
ProcesingTime(1000)).format("kafka").start();

You can do all the same things you can do with Structured Streaming as
DStreams. For example, there is foreach in Structured Streaming. E.g.
resultDataSet.writeStream.foreach(...)

When you say mapPartitions code is not getting executed. ... are you sure
the query is running? Maybe actual code (not pseudocode) may help debug
this.


On Wed, Sep 13, 2017 at 11:20 AM, kant kodali  wrote:

> Hi All,
>
> I am trying to read data from kafka, insert into Mongo and read from mongo
> and insert back into Kafka. I went with structured stream approach first
> however I believe I am making some naiver error because my map operations
> are not getting invoked.
>
> The pseudo code looks like this
>
> DataSet resultDataSet = jsonDataset.mapPartitions(
> insertIntoMongo).mapPartitions(readFromMongo);
>
> StreamingQuery query = resultDataSet.trigger(ProcesingTime(1000)).format("
> kafka").start();
>
> query.awaitTermination();
>
> The mapPartitions in this code is not getting executed. Is this because I
> am not calling any action on my streaming dataset? In the Dstream case, I
> used to call forEachRDD and it worked well. so how do I do this using
> structured streaming?
>
> Thanks!
>


RE: RDD order preservation through transformations

2017-09-14 Thread johan.grande.ext
Well if the order cannot be guaranteed in case of a failure (or at all since 
failure can happen transparently), what does it mean to sort an RDD (method 
sortBy)?


On 2017-09-14 03:36 CEST mehmet.su...@gmail.com wrote:

I think it is one of the conceptual difference in Spark compare to other 
languages, there is no indexing in plain RDDs, This was the thread with Ankit:

Yes. So order preservation can not be guaranteed in the case of failure. Also 
not sure if partitions are ordered. Can you get the same sequence of partitions 
in mapPartition?

On 13 Sep 2017 19:54, "Ankit Maloo"  wrote:
>
> Rdd are fault tolerant as it can be recomputed using DAG without storing the 
> intermediate RDDs.
>
> On 13-Sep-2017 11:16 PM, "Suzen, Mehmet"  wrote:
>>
>> But what happens if one of the partitions fail, how fault tolerance recover 
>> elements in other partitions.

_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: RDD order preservation through transformations

2017-09-14 Thread johan.grande.ext
(Sorry Mehmet, I'm seeing just now your first reply with the link to SO; it had 
first gone to my spam folder :-/ )


On 2017-09-14 10:02 CEST, GRANDE Johan Ext DTSI/DSI wrote:

Well if the order cannot be guaranteed in case of a failure (or at all since 
failure can happen transparently), what does it mean to sort an RDD (method 
sortBy)?


On 2017-09-14 03:36 CEST mehmet.su...@gmail.com wrote:

I think it is one of the conceptual difference in Spark compare to other 
languages, there is no indexing in plain RDDs, This was the thread with Ankit:

Yes. So order preservation can not be guaranteed in the case of failure. Also 
not sure if partitions are ordered. Can you get the same sequence of partitions 
in mapPartition?

On 13 Sep 2017 19:54, "Ankit Maloo"  wrote:
>
> Rdd are fault tolerant as it can be recomputed using DAG without storing the 
> intermediate RDDs.
>
> On 13-Sep-2017 11:16 PM, "Suzen, Mehmet"  wrote:
>>
>> But what happens if one of the partitions fail, how fault tolerance recover 
>> elements in other partitions.

_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.



RE: RDD order preservation through transformations

2017-09-14 Thread johan.grande.ext
In several situations I would like to zip RDDs knowing that their order 
matches. In particular I’m using an MLLib KMeansModel on an RDD of Vectors so I 
would like to do:

myData.zip(myModel.predict(myData))

Also the first column in my RDD is a timestamp which I don’t want to be a part 
of the model, so in fact I would like to split the first column out of my RDD, 
then do:

myData.zip(myModel.predict(myData.map(dropTimestamp)))

Moreover I’d like my data to be scaled and go through a principal component 
analysis first, so the main steps would be like:

val noTs = myData.map(dropTimestamp)
val scaled = scaler.transform(noTs)
val projected = (new RowMatrix(scaled)).multiply(principalComponents).rows
val clusters = myModel.predict(projected)
val result = myData.zip(clusters)

Do you think there’s a chance that the 4 transformations above would preserve 
order so the zip at the end would be correct?


On 2017-09-13 19:51 CEST, lucas.g...@gmail.com wrote :

I'm wondering why you need order preserved, we've had situations where keeping 
the source as an artificial field in the dataset was important and I had to run 
contortions to inject that (In this case the datasource had no unique key).

Is this similar?

On 13 September 2017 at 10:46, Suzen, Mehmet 
mailto:su...@acm.org>> wrote:
But what happens if one of the partitions fail, how fault tolarence recover 
elements in other partitions.

On 13 Sep 2017 18:39, "Ankit Maloo" 
mailto:ankitmaloo1...@gmail.com>> wrote:
AFAIK, the order of a rdd is maintained across a partition for Map operations. 
There is no way a map operation  can change sequence across a partition as 
partition is local and computation happens one record at a time.

On 13-Sep-2017 9:54 PM, "Suzen, Mehmet" mailto:su...@acm.org>> 
wrote:
I think the order has no meaning in RDDs see this post, specially zip methods:
https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.



Re: RDD order preservation through transformations

2017-09-14 Thread Georg Heiler
Usually spark ml Models specify the columns they use for training. i.e. you
would only select your columns (X) for model training but metadata i.e.
target labels or your date column  (y) would still be present for each row.

 schrieb am Do., 14. Sep. 2017 um 10:42 Uhr:

> In several situations I would like to zip RDDs knowing that their order
> matches. In particular I’m using an MLLib KMeansModel on an RDD of Vectors
> so I would like to do:
>
>
>
> myData.zip(myModel.predict(myData))
>
>
>
> Also the first column in my RDD is a timestamp which I don’t want to be a
> part of the model, so in fact I would like to split the first column out of
> my RDD, then do:
>
>
>
> myData.zip(myModel.predict(myData.map(dropTimestamp)))
>
>
>
> Moreover I’d like my data to be scaled and go through a principal
> component analysis first, so the main steps would be like:
>
>
>
> val noTs = myData.map(dropTimestamp)
>
> val scaled = scaler.transform(noTs)
>
> val projected = (new RowMatrix(scaled)).multiply(principalComponents).rows
>
> val clusters = myModel.predict(projected)
>
> val result = myData.zip(clusters)
>
>
>
> Do you think there’s a chance that the 4 transformations above would
> preserve order so the zip at the end would be correct?
>
>
>
>
>
> On 2017-09-13 19:51 CEST, lucas.g...@gmail.com wrote :
>
>
>
> I'm wondering why you need order preserved, we've had situations where
> keeping the source as an artificial field in the dataset was important and
> I had to run contortions to inject that (In this case the datasource had no
> unique key).
>
>
>
> Is this similar?
>
>
>
> On 13 September 2017 at 10:46, Suzen, Mehmet  wrote:
>
> But what happens if one of the partitions fail, how fault tolarence
> recover elements in other partitions.
>
>
>
> On 13 Sep 2017 18:39, "Ankit Maloo"  wrote:
>
> AFAIK, the order of a rdd is maintained across a partition for Map
> operations. There is no way a map operation  can change sequence across a
> partition as partition is local and computation happens one record at a
> time.
>
>
>
> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:
>
> I think the order has no meaning in RDDs see this post, specially zip
> methods:
> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
> _
>
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>


Re: RDD order preservation through transformations

2017-09-14 Thread Suzen, Mehmet
On 14 September 2017 at 10:42,   wrote:
> val noTs = myData.map(dropTimestamp)
>
> val scaled = scaler.transform(noTs)
>
> val projected = (new RowMatrix(scaled)).multiply(principalComponents).rows
>
> val clusters = myModel.predict(projected)
>
> val result = myData.zip(clusters)
>
>
>
> Do you think there’s a chance that the 4 transformations above would
> preserve order so the zip at the end would be correct?

AFAIK, No. The sequence of transformation you have will not guarantee
to preserve order.
First, apply zip, then you need to keep track of indices in the
subsequent transformations,
with `_2`, as zip returns tuples.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



cannot cast to double from spark row

2017-09-14 Thread KhajaAsmath Mohammed
Hi,

I am getting below error when trying to cast column value from spark
dataframe to double. any issues. I tried many solutions but none of them
worked.

 java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Double

1. row.getAs[Double](Constants.Datapoint.Latitude)

2. row.getAs[String](Constants.Datapoint.Latitude).toDouble

I dont want to use row.getDouble(0) as position of column in file keeps on
change.

Thanks,
Asmath


Re: [Structured Streaming] Multiple sources best practice/recommendation

2017-09-14 Thread Michael Armbrust
I would probably suggest that you partition by format (though you can get
the file name from the build in function input_file_name()).  You can load
multiple streams from different directories and union them together as long
as the schema is the same after parsing.  Otherwise you can just run
multiple streams on the same cluster.

On Wed, Sep 13, 2017 at 7:56 AM, JG Perrin  wrote:

> Hi,
>
>
>
> I have different files being dumped on S3, I want to ingest them and join
> them.
>
>
>
> What does sound better to you? Have one “ directory” for all or one per
> file format?
>
>
>
> If I have one directory for all, can you get some metadata about the file,
> like its name?
>
>
>
> If multiple directory, how can I have multiple “listeners”?
>
>
>
> Thanks
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: cannot cast to double from spark row

2017-09-14 Thread Ram Sriharsha
try df.select($"col".cast(DoubleType))

import org.apache.spark.sql.types._

val df = spark.sparkContext.parallelize(Seq(("1.04"))).toDF("c")

df.select($"c".cast(DoubleType))

On Thu, Sep 14, 2017 at 9:20 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am getting below error when trying to cast column value from spark
> dataframe to double. any issues. I tried many solutions but none of them
> worked.
>
>  java.lang.ClassCastException: java.lang.String cannot be cast to
> java.lang.Double
>
> 1. row.getAs[Double](Constants.Datapoint.Latitude)
>
> 2. row.getAs[String](Constants.Datapoint.Latitude).toDouble
>
> I dont want to use row.getDouble(0) as position of column in file keeps on
> change.
>
> Thanks,
> Asmath
>



-- 
Ram Sriharsha
Product Manager, Apache Spark
PPMC Member and Committer, Apache Spark
Databricks
San Francisco, CA
Ph: 408-510-8635
email: har...@apache.org

[image: https://www.linkedin.com/in/harsha340]
 



Re: [SS]How to add a column with custom system time?

2017-09-14 Thread Michael Armbrust
Can you show the explain() for the version that doesn't work?  You might
just be hitting a bug.

On Tue, Sep 12, 2017 at 9:03 PM, 张万新  wrote:

> It seems current_timestamp() cannot be used directly in window function?
> because after attempts I found that using
>
> *df.count.withColumn("pTime", current_timestamp).select(window($"pTime",
> "15 minutes"), $"count")*
>
> instead of
>
> *df.count.withColumn("window", window(current_timestamp(), "15 minutes"))*
>
> throws no exception and works fine. I don't know if this is a problem that
> needs improvement.
>
>
> 张万新 于2017年9月13日周三 上午11:43写道:
>
>> and I use .withColumn("window", window(current_timestamp(), "15
>> minutes")) after count
>>
>> 张万新 于2017年9月13日周三 上午11:32写道:
>>
>>> *Yes, my code is shown below*
>>> /**
>>> * input
>>> */
>>>   val logs = spark
>>> .readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>>> .option("subscribe", TOPIC)
>>> .option("startingOffset", "latest")
>>> .load()
>>>
>>>   /**
>>> * process
>>> */
>>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>>
>>>   val events = logValues
>>> .map(parseFunction)
>>> .select(
>>>   $"_1".alias("date").cast("timestamp"),
>>>   $"_2".alias("uuid").cast("string")
>>> )
>>>
>>>   val results = events
>>> .withWatermark("date", "1 day")
>>> .dropDuplicates("uuid", "date")
>>> .groupBy($"date")
>>> .count()
>>> .withColumn("window", window(current_timestamp(), "15 minutes"))
>>>
>>>   /**
>>> * output
>>> */
>>>   val query = results
>>> .writeStream
>>> .outputMode("update")
>>> .format("console")
>>> .option("truncate", "false")
>>> .trigger(Trigger.ProcessingTime("1 seconds"))
>>> .start()
>>>
>>>   query.awaitTermination()
>>>
>>> *and I use play json to parse input logs from kafka ,the parse function
>>> is like*
>>>
>>>   def parseFunction(str: String): (Long, String) = {
>>> val json = Json.parse(str)
>>> val timestamp = (json \ "time").get.toString().toLong
>>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>>> val uuid = (json \ "uuid").get.toString()
>>> (date, uuid)
>>>   }
>>>
>>> Michael Armbrust 于2017年9月13日周三 上午2:36写道:
>>>
 Can you show all the code?  This works for me.

 On Tue, Sep 12, 2017 at 12:05 AM, 张万新  wrote:

> The spark version is 2.2.0
>
> Michael Armbrust 于2017年9月12日周二 下午12:32写道:
>
>> Which version of spark?
>>
>> On Mon, Sep 11, 2017 at 8:27 PM, 张万新  wrote:
>>
>>> Thanks for reply, but using this method I got an exception:
>>>
>>> "Exception in thread "main" 
>>> org.apache.spark.sql.streaming.StreamingQueryException:
>>> nondeterministic expressions are only allowed in
>>>
>>> Project, Filter, Aggregate or Window"
>>>
>>> Can you give more advice?
>>>
>>> Michael Armbrust 于2017年9月12日周二 上午4:48写道:
>>>
 import org.apache.spark.sql.functions._

 df.withColumn("window", window(current_timestamp(), "15 minutes"))

 On Mon, Sep 11, 2017 at 3:03 AM, 张万新 
 wrote:

> Hi,
>
> In structured streaming how can I add a column to a dataset with
> current system time aligned with 15 minutes?
>
> Thanks.
>


>>



Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread Michael Armbrust
How many UUIDs do you expect to have in a day?  That is likely where all
the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新  wrote:

> *Yes, my code is shown below(I also post my code in another mail)*
> /**
> * input
> */
>   val logs = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", BROKER_SERVER)
> .option("subscribe", TOPIC)
> .option("startingOffset", "latest")
> .load()
>
>   /**
> * process
> */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
> .map(parseFunction)
> .select(
>   $"_1".alias("date").cast("timestamp"),
>   $"_2".alias("uuid").cast("string")
> )
>
>   val results = events
> .withWatermark("date", "1 day")
> .dropDuplicates("uuid", "date")
> .groupBy($"date")
> .count()
>
>   /**
> * output
> */
>   val query = results
> .writeStream
> .outputMode("update")
> .format("console")
> .option("truncate", "false")
> .trigger(Trigger.ProcessingTime("1 seconds"))
> .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
> val json = Json.parse(str)
> val timestamp = (json \ "time").get.toString().toLong
> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
> val uuid = (json \ "uuid").get.toString()
> (date, uuid)
>   }
>
> and the java heap space is like (I've increase the executor memory to 15g):
>
> [image: image.png]
> Michael Armbrust 于2017年9月13日周三 上午2:23写道:
>
>> Can you show the full query you are running?
>>
>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>>
>>> Hi,
>>>
>>> I'm using structured streaming to count unique visits of our website. I
>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>> full gc, and once the count raises to about more than 4.5 millions the
>>> application will be blocked and finally crash in OOM. It's kind of
>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>> of SS? Thanks.
>>>
>>
>>


PLs assist: trying to FlatMap a DataSet / partially OT

2017-09-14 Thread Marco Mistroni
HI all
 could anyone assist pls?
i am trying to flatMap a DataSet[(String, String)] and i am getting errors
in Eclipse
the errors are more Scala related than spark -related, but i was wondering
if someone came across
a similar situation

here's what i got. A DS of (String, String) , out of which i am using
flatMap to get a List[Char] of for the second element in the tuple.

val tplDataSet = < DataSet[(String, String)] >

val expanded = tplDataSet.flatMap(tpl  => tpl._2.toList,
Encoders.product[(String, String)])


Eclipse complains that  'tpl' in the above function is missing parameter
type

what am i missing? or perhaps i am using the wrong approach?

w/kindest regards
 Marco


Re: [SS] Any way to optimize memory consumption of SS?

2017-09-14 Thread 张万新
There is expected to be about 5 million UUIDs in a day. I need to use this
field to drop duplicate records and count number. If I simply count numbers
without using dropDuplicates it only occupies less than 1g memory. I
believe most of the memory is occupied by the state store for keeping the
state of dropDuplicates. But I cannot find a way to alleviate the problem.

Michael Armbrust 于2017年9月15日周五 上午3:35写道:

> How many UUIDs do you expect to have in a day?  That is likely where all
> the memory is being used.  Does it work without that?
>
> On Tue, Sep 12, 2017 at 8:42 PM, 张万新  wrote:
>
>> *Yes, my code is shown below(I also post my code in another mail)*
>> /**
>> * input
>> */
>>   val logs = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", BROKER_SERVER)
>> .option("subscribe", TOPIC)
>> .option("startingOffset", "latest")
>> .load()
>>
>>   /**
>> * process
>> */
>>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>>
>>   val events = logValues
>> .map(parseFunction)
>> .select(
>>   $"_1".alias("date").cast("timestamp"),
>>   $"_2".alias("uuid").cast("string")
>> )
>>
>>   val results = events
>> .withWatermark("date", "1 day")
>> .dropDuplicates("uuid", "date")
>> .groupBy($"date")
>> .count()
>>
>>   /**
>> * output
>> */
>>   val query = results
>> .writeStream
>> .outputMode("update")
>> .format("console")
>> .option("truncate", "false")
>> .trigger(Trigger.ProcessingTime("1 seconds"))
>> .start()
>>
>>   query.awaitTermination()
>>
>> *and I use play json to parse input logs from kafka ,the parse function
>> is like*
>>
>>   def parseFunction(str: String): (Long, String) = {
>> val json = Json.parse(str)
>> val timestamp = (json \ "time").get.toString().toLong
>> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>> val uuid = (json \ "uuid").get.toString()
>> (date, uuid)
>>   }
>>
>> and the java heap space is like (I've increase the executor memory to
>> 15g):
>>
>> [image: image.png]
>> Michael Armbrust 于2017年9月13日周三 上午2:23写道:
>>
>>> Can you show the full query you are running?
>>>
>>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新  wrote:
>>>
 Hi,

 I'm using structured streaming to count unique visits of our website. I
 use spark on yarn mode with 4 executor instances and from 2 cores * 5g
 memory to 4 cores * 10g memory for each executor, but there are frequent
 full gc, and once the count raises to about more than 4.5 millions the
 application will be blocked and finally crash in OOM. It's kind of
 unreasonable. So is there any suggestion to optimize the memory consumption
 of SS? Thanks.

>>>
>>>
>


Re: Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-14 Thread kant kodali
Thanks T.D! And sorry for the typo. It's very helpful to know that whatever
I was achieving with DStreams I can also achieve the same with Structured
streaming.

It seems like there is some other error in my code which I fixed it and it
seem to be working fine now!

Thanks again!

On Thu, Sep 14, 2017 at 12:23 AM, Tathagata Das  wrote:

> Are you sure the code is correct? A Dataset does not have a method
> "trigger". Rather I believe the correct code should be
>
> StreamingQuery query = resultDataSet*.writeStream.*trigger(
> ProcesingTime(1000)).format("kafka").start();
>
> You can do all the same things you can do with Structured Streaming as
> DStreams. For example, there is foreach in Structured Streaming. E.g.
> resultDataSet.writeStream.foreach(...)
>
> When you say mapPartitions code is not getting executed. ... are you sure
> the query is running? Maybe actual code (not pseudocode) may help debug
> this.
>
>
> On Wed, Sep 13, 2017 at 11:20 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am trying to read data from kafka, insert into Mongo and read from
>> mongo and insert back into Kafka. I went with structured stream approach
>> first however I believe I am making some naiver error because my map
>> operations are not getting invoked.
>>
>> The pseudo code looks like this
>>
>> DataSet resultDataSet = jsonDataset.mapPartitions(
>> insertIntoMongo).mapPartitions(readFromMongo);
>>
>> StreamingQuery query = resultDataSet.trigger(Procesin
>> gTime(1000)).format("kafka").start();
>>
>> query.awaitTermination();
>>
>> The mapPartitions in this code is not getting executed. Is this because I
>> am not calling any action on my streaming dataset? In the Dstream case, I
>> used to call forEachRDD and it worked well. so how do I do this using
>> structured streaming?
>>
>> Thanks!
>>
>
>