How to convert Dataset to Dataset in Spark Structured Streaming?

2017-05-30 Thread kant kodali
Hi All,

I have a Dataset and I am trying to convert it into Dataset
(json String) using Spark Structured Streaming. I have tried the following.

df2.toJSON().writeStream().foreach(new KafkaSink())

This doesn't seem to work for the following reason.

"Queries with streaming sources must be executed with writeStream.start()"

My dataframe has looks like this

name, ratio, count  // column names

"hello", 1.56, 34

If I try to convert a Row into a Json String it results into something like
this {"key1", "name", "value1": "hello",  "key2", "ratio", "value2": 1.56 ,
"key3", "count", "value3": 34} but *what I need is something like this {
result: {"name": "hello", "ratio": 1.56, "count": 34} } however I don't
have a result column. *

It looks like there are couple of functions to_json and json_tuple but they
seem to take only one Column as a first argument so should I call to_json
on every column? Also how would I turn this into DataSet ?

Thanks!


Checkpointing fro reduceByKeyAndWindow with a window size of 1 hour and 24 hours

2017-05-30 Thread SRK
Hi,

What happens if I dont specify checkpointing on a DStream that has
reduceByKeyAndWindow  with no inverse function? Would it cause the memory to
be overflown? My window sizes are 1 hour and 24 hours.
I cannot provide an inserse function for this as it is based on HyperLogLog.

My code looks like something like the following:

  val logsByPubGeo = messages.map(_._2).filter(_.geo !=
Constants.UnknownGeo).map {
log =>
  val key = PublisherGeoKey(log.publisher, log.geo)
  val agg = AggregationLog(
timestamp = log.timestamp,
sumBids = log.bid,
imps = 1,
uniquesHll = hyperLogLog(log.cookie.getBytes(Charsets.UTF_8))
  )
  (key, agg)
  }


 val aggLogs = logsByPubGeo.reduceByKeyAndWindow(reduceAggregationLogs,
BatchDuration)

   private def reduceAggregationLogs(aggLog1: AggregationLog, aggLog2:
AggregationLog) = {
aggLog1.copy(
  timestamp = math.min(aggLog1.timestamp, aggLog2.timestamp),
  sumBids = aggLog1.sumBids + aggLog2.sumBids,
  imps = aggLog1.imps + aggLog2.imps,
  uniquesHll = aggLog1.uniquesHll + aggLog2.uniquesHll
)
  }


Please let me know.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Checkpointing-fro-reduceByKeyAndWindow-with-a-window-size-of-1-hour-and-24-hours-tp28722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: foreachPartition in Spark Java API

2017-05-30 Thread Anton Kravchenko
//ForEachPartFunction.java:

import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class ForEachPartFunction implements ForeachPartitionFunction{
public void call(Iterator t) throws Exception{
List rows = new ArrayList();
while(t.hasNext()) {
Row irow = t.next();
rows.add(irow.toString());
}
System.out.println(rows.toString());
}
}

On Tue, May 30, 2017 at 2:01 PM, Anton Kravchenko <
kravchenko.anto...@gmail.com> wrote:

> Ok, there are at least two ways to do it:
> Dataset df = spark.read.csv("file:///C:/input_data/*.csv")
>
> df.foreachPartition(new ForEachPartFunction());
> df.toJavaRDD().foreachPartition(new Void_java_func());
>
> where ForEachPartFunction and Void_java_func are defined below:
>
> // ForEachPartFunction.java:
> import org.apache.spark.api.java.function.VoidFunction;
> import org.apache.spark.sql.Row;
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.List;
>
> public class Void_java_func implements VoidFunction {
> public void call(Iterator it) {
> List rows = new ArrayList();
>
> while (it.hasNext()) {
> Row irow = it.next();
> rows.add(irow.toString());
> }
> }
> }
>
> // Void_java_func.java:
> import org.apache.spark.api.java.function.VoidFunction;
> import org.apache.spark.sql.Row;
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.List;
>
> public class Void_java_func implements VoidFunction {
> public void call(Iterator it) {
> List rows = new ArrayList();
>
> while (it.hasNext()) {
> Row irow = it.next();
> rows.add(irow.toString());
> }
> }
> }
>
> Anton
>
>
> On Tue, May 30, 2017 at 10:58 AM, Anton Kravchenko <
> kravchenko.anto...@gmail.com> wrote:
>
>> What would be a Java equivalent of the Scala code below?
>>
>>  def void_function_in_scala(ipartition: Iterator[Row]): Unit ={
>> var df_rows=ArrayBuffer[String]()
>> for(irow<-ipartition){
>> df_rows+=irow.toString
>> }
>>
>>  val df = spark.read.csv("file:///C:/input_data/*.csv")
>>  df.foreachPartition(void_function_in_scala);
>>
>> Thank you,
>> Anton
>>
>
>


Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Asher Krim
You should actually be able to get to the underlying filesystem from your
SparkContext:

String originalFs = sparkContext.hadoopConfiguration().get("fs.defaultFS");


and then you could just use that:

String checkpointPath = String.format("%s/%s/", originalFs,
checkpointDirectory);
sparkContext.setCheckpointDir(checkpointPath);


Asher Krim
Senior Software Engineer

On Tue, May 30, 2017 at 12:37 PM, Everett Anderson  wrote:

> Still haven't found a --conf option.
>
> Regarding a temporary HDFS checkpoint directory, it looks like when using
> --master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment
> variable. Thus, one could do the following when creating a SparkSession:
>
> val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"),
> "checkpoints").toString
> sparkSession.sparkContext.setCheckpointDir(checkpointPath)
>
> The staging directory is in an HDFS path like
>
> /user/[user]/.sparkStaging/[YARN application ID]
>
> and is deleted at the end of the application
> 
> .
>
> So this is one option, though certainly abusing the staging directory.
>
> A more general one might be to find where Dataset.persist(DISK_ONLY)
> writes.
>
>
> On Fri, May 26, 2017 at 9:08 AM, Everett Anderson 
> wrote:
>
>> Hi,
>>
>> I need to set a checkpoint directory as I'm starting to use GraphFrames.
>> (Also, occasionally my regular DataFrame lineages get too long so it'd be
>> nice to use checkpointing to squash the lineage.)
>>
>> I don't actually need this checkpointed data to live beyond the life of
>> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and
>> reading and writing non-transient data to S3.
>>
>> Two questions:
>>
>> 1. Is there a Spark --conf option to set the checkpoint directory?
>> Somehow I couldn't find it, but surely it exists.
>>
>> 2. What's a good checkpoint directory for this use case? I imagine it'd
>> be on HDFS and presumably in a YARN application-specific temporary path
>> that gets cleaned up afterwards. Does anyone have a recommendation?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Re: foreachPartition in Spark Java API

2017-05-30 Thread Anton Kravchenko
Ok, there are at least two ways to do it:
Dataset df = spark.read.csv("file:///C:/input_data/*.csv")

df.foreachPartition(new ForEachPartFunction());
df.toJavaRDD().foreachPartition(new Void_java_func());

where ForEachPartFunction and Void_java_func are defined below:

// ForEachPartFunction.java:
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class Void_java_func implements VoidFunction {
public void call(Iterator it) {
List rows = new ArrayList();

while (it.hasNext()) {
Row irow = it.next();
rows.add(irow.toString());
}
}
}

// Void_java_func.java:
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class Void_java_func implements VoidFunction {
public void call(Iterator it) {
List rows = new ArrayList();

while (it.hasNext()) {
Row irow = it.next();
rows.add(irow.toString());
}
}
}

Anton


On Tue, May 30, 2017 at 10:58 AM, Anton Kravchenko <
kravchenko.anto...@gmail.com> wrote:

> What would be a Java equivalent of the Scala code below?
>
>  def void_function_in_scala(ipartition: Iterator[Row]): Unit ={
> var df_rows=ArrayBuffer[String]()
> for(irow<-ipartition){
> df_rows+=irow.toString
> }
>
>  val df = spark.read.csv("file:///C:/input_data/*.csv")
>  df.foreachPartition(void_function_in_scala);
>
> Thank you,
> Anton
>


Re: Random Forest hangs without trace of error

2017-05-30 Thread Morten Hornbech
Hi Sumona

I’m afraid I never really resolved the issue. Actually I have just had to 
rollback an upgrade from 2.1.0 to 2.1.1 because it (for reasons unknown) 
reintroduced the issue in our nightly integration tests (see 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-upgrading-to-Spark-2-1-1-from-2-1-0-tc28660.html
 
)

The “solution” for me at the time was to wave my magic Spark wand and hope for 
the best. That generally means 

- trying increasing memory or reducing amount of memory required (smaller 
datasets, lower sample rate, more partitions, less caching) 
- performing random changes to various other parts of the pipeline, including 
SQL statements and adding/removing stuff such as repartition/coalesce
- flipping around with various spark configuration settings

In the specific case I think it was the subsampling rate that did the trick.

I find issues such as this one extremely demanding to debug because they can 
generally not be reproduced locally. I guess you basically need to build Spark 
yourself with appropriate instrumentation added, and even this would probably 
require a very deep insight into Sparks guts.

Hanging threads are in my opinion the worst possible behaviour of a program, so 
if anyone can shed some light on this or provide any debugging hints it would 
be amazing.

Morten


> Den 30. maj 2017 kl. 19.29 skrev Sumona Routh :
> 
> Hi Morten,
> Were you able to resolve your issue with RandomForest? I am having similar 
> issues with a newly trained model (that does have larger number of trees, 
> smaller minInstancesPerNode, which is by design to produce the best 
> performing model). 
> 
> I wanted to get some feedback on how you solved your issue before I post a 
> separate question.
> 
> Thanks!
> Sumona
> 
> On Sun, Dec 11, 2016 at 4:10 AM Marco Mistroni  > wrote:
> OK. Did u change spark version? Java/scala/python version? 
> Have u tried with different versions of any of the above?
> Hope this helps 
> Kr
> 
> On 10 Dec 2016 10:37 pm, "Morten Hornbech"  > wrote:
> I haven’t actually experienced any non-determinism. We have nightly 
> integration tests comparing output from random forests with no variations.
> 
> The workaround we will probably try is to split the dataset, either randomly 
> or on one of the variables, and then train a forest on each partition, which 
> should then be sufficiently small.
> 
> I hope to be able to provide a good repro case in some weeks. If the problem 
> was in our own code I will also post it in this thread.
> 
> Morten
> 
>> Den 10. dec. 2016 kl. 23.25 skrev Marco Mistroni > >:
>> 
>> Hello Morten
>> ok.
>> afaik there is a tiny bit of randomness in these ML algorithms (pls anyone 
>> correct me if i m wrong).
>> In fact if you run your RDF code multiple times, it will not give you 
>> EXACTLY the same results (though accuracy and errors should me more or less 
>> similar)..at least this is what i found when playing around with 
>> RDF and decision trees and other ML algorithms
>> 
>> If RDF is not a must for your usecase, could you try 'scale back' to 
>> Decision Trees and see if you still get intermittent failures?
>> this at least to exclude issues with the data
>> 
>> hth 
>>  marco
>> 
>> On Sat, Dec 10, 2016 at 5:20 PM, Morten Hornbech > > wrote:
>> Already did. There are no issues with smaller samples. I am running this in 
>> a cluster of three t2.large instances on aws.
>> 
>> I have tried to find the threshold where the error occurs, but it is not a 
>> single factor causing it. Input size and subsampling rate seems to be most 
>> significant, and number of trees the least.
>> 
>> I have also tried running on a test frame of randomized numbers with the 
>> same number of rows, and could not reproduce the problem here.
>> 
>> By the way maxDepth is 5 and maxBins is 32.
>> 
>> I will probably need to leave this for a few weeks to focus on more 
>> short-term stuff, but I will write here if I solve it or reproduce it more 
>> consistently.
>> 
>> Morten
>> 
>>> Den 10. dec. 2016 kl. 17.29 skrev Marco Mistroni >> >:
>>> 
>>> Hi
>>>  Bring back samples to 1k range to debugor as suggested reduce tree and 
>>> bins had rdd running on same size data with no issues.or send me 
>>> some sample code and data and I try it out on my ec2 instance ...
>>> Kr
>>> 
>>> On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" >> > wrote:
>>> I had similar experience last week. Even I could not find any error trace. 
>>> 
>>> Later on, I did the following to get rid of the problem: 

foreachPartition in Spark Java API

2017-05-30 Thread Anton Kravchenko
What would be a Java equivalent of the Scala code below?

 def void_function_in_scala(ipartition: Iterator[Row]): Unit ={
var df_rows=ArrayBuffer[String]()
for(irow<-ipartition){
df_rows+=irow.toString
}

 val df = spark.read.csv("file:///C:/input_data/*.csv")
 df.foreachPartition(void_function_in_scala);

Thank you,
Anton


Re: Disable queuing of spark job on Mesos cluster if sufficient resources are not found

2017-05-30 Thread Michael Gummelt
The driver will remain in the queue indefinitely, unless you issue a kill
command at /v1/submissions/kill/
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala#L64

On Mon, May 29, 2017 at 1:15 AM, Mevada, Vatsal 
wrote:

> Is there any configurable timeout which controls queuing of the driver in
> Mesos cluster mode or the driver will remain in queue for indefinite until
> it find resource on cluster?
>
>
>
> *From:* Michael Gummelt [mailto:mgumm...@mesosphere.io]
> *Sent:* Friday, May 26, 2017 11:33 PM
> *To:* Mevada, Vatsal 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Disable queuing of spark job on Mesos cluster if
> sufficient resources are not found
>
>
>
> Nope, sorry.
>
>
>
> On Fri, May 26, 2017 at 4:38 AM, Mevada, Vatsal 
> wrote:
>
> Hello,
>
> I am using Mesos with cluster deployment mode to submit my jobs.
>
> When sufficient resources are not available on Mesos cluster, I can see
> that my jobs are queuing up on Mesos dispatcher UI.
>
> Is it possible to tweak some configuration so that my job submission fails
> gracefully(instead of queuing up) if sufficient resources are not found on
> Mesos cluster?
>
> Regards,
>
> Vatsal
>
>
>
>
> --
>
> Michael Gummelt
>
> Software Engineer
>
> Mesosphere
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Random Forest hangs without trace of error

2017-05-30 Thread Sumona Routh
Hi Morten,
Were you able to resolve your issue with RandomForest? I am having similar
issues with a newly trained model (that does have larger number of trees,
smaller minInstancesPerNode, which is by design to produce the best
performing model).

I wanted to get some feedback on how you solved your issue before I post a
separate question.

Thanks!
Sumona

On Sun, Dec 11, 2016 at 4:10 AM Marco Mistroni  wrote:

> OK. Did u change spark version? Java/scala/python version?
> Have u tried with different versions of any of the above?
> Hope this helps
> Kr
>
> On 10 Dec 2016 10:37 pm, "Morten Hornbech"  wrote:
>
>> I haven’t actually experienced any non-determinism. We have nightly
>> integration tests comparing output from random forests with no variations.
>>
>> The workaround we will probably try is to split the dataset, either
>> randomly or on one of the variables, and then train a forest on each
>> partition, which should then be sufficiently small.
>>
>> I hope to be able to provide a good repro case in some weeks. If the
>> problem was in our own code I will also post it in this thread.
>>
>> Morten
>>
>> Den 10. dec. 2016 kl. 23.25 skrev Marco Mistroni :
>>
>> Hello Morten
>> ok.
>> afaik there is a tiny bit of randomness in these ML algorithms (pls
>> anyone correct me if i m wrong).
>> In fact if you run your RDF code multiple times, it will not give you
>> EXACTLY the same results (though accuracy and errors should me more or less
>> similar)..at least this is what i found when playing around with
>> RDF and decision trees and other ML algorithms
>>
>> If RDF is not a must for your usecase, could you try 'scale back' to
>> Decision Trees and see if you still get intermittent failures?
>> this at least to exclude issues with the data
>>
>> hth
>>  marco
>>
>> On Sat, Dec 10, 2016 at 5:20 PM, Morten Hornbech 
>> wrote:
>>
>>> Already did. There are no issues with smaller samples. I am running this
>>> in a cluster of three t2.large instances on aws.
>>>
>>> I have tried to find the threshold where the error occurs, but it is not
>>> a single factor causing it. Input size and subsampling rate seems to be
>>> most significant, and number of trees the least.
>>>
>>> I have also tried running on a test frame of randomized numbers with the
>>> same number of rows, and could not reproduce the problem here.
>>>
>>> By the way maxDepth is 5 and maxBins is 32.
>>>
>>> I will probably need to leave this for a few weeks to focus on more
>>> short-term stuff, but I will write here if I solve it or reproduce it more
>>> consistently.
>>>
>>> Morten
>>>
>>> Den 10. dec. 2016 kl. 17.29 skrev Marco Mistroni :
>>>
>>> Hi
>>>  Bring back samples to 1k range to debugor as suggested reduce tree
>>> and bins had rdd running on same size data with no issues.or send
>>> me some sample code and data and I try it out on my ec2 instance ...
>>> Kr
>>>
>>> On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
 I had similar experience last week. Even I could not find any error
 trace.

 Later on, I did the following to get rid of the problem:
 i) I downgraded to Spark 2.0.0
 ii) Decreased the value of maxBins and maxDepth

 Additionally, make sure that you set the featureSubsetStrategy as
 "auto" to let the algorithm choose the best feature subset strategy
 for your data. Finally, set the impurity as "gini" for the information
 gain.

 However, setting the value of no. of trees to just 1 does not give you
 either real advantage of the forest neither better predictive performance.



 Best,
 Karim


 On Dec 9, 2016 11:29 PM, "mhornbech"  wrote:

> Hi
>
> I have spent quite some time trying to debug an issue with the Random
> Forest
> algorithm on Spark 2.0.2. The input dataset is relatively large at
> around
> 600k rows and 200MB, but I use subsampling to make each tree
> manageable.
> However even with only 1 tree and a low sample rate of 0.05 the job
> hangs at
> one of the final stages (see attached). I have checked the logs on all
> executors and the driver and find no traces of error. Could it be a
> memory
> issue even though no error appears? The error does seem sporadic to
> some
> extent so I also wondered whether it could be a data issue, that only
> occurs
> if the subsample includes the bad data rows.
>
> Please comment if you have a clue.
>
> Morten
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n28192/Sk%C3%A6rmbillede_2016-12-10_kl.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-error-tp28192.html
> Sent 

No TypeTag Available for String

2017-05-30 Thread krishmah
I am currently using Spark 2.0.1 with Scala 2.11.8. However same code works
with Scala 2.10.6. Please advise if I am missing something

import org.apache.spark.sql.functions.udf

val getFileName = udf{z:String => z.takeRight(z.length
-z.lastIndexOf("/")-1)}

and this gives me following error messages

No Type Tag Available for String and

not enough arguments for method udf: (implicit evidence$2:
reflect.runtime.universe.TypeTag[String], implicit evidence$3:
reflect.runtime.universe.TypeTag[String])org.apache.spark.sql.expressions.UserDefinedFunction.
Unspecified value parameters evidence$2, evidence$3.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-TypeTag-Available-for-String-tp28720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Everett Anderson
Still haven't found a --conf option.

Regarding a temporary HDFS checkpoint directory, it looks like when using
--master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment
variable. Thus, one could do the following when creating a SparkSession:

val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"),
"checkpoints").toString
sparkSession.sparkContext.setCheckpointDir(checkpointPath)

The staging directory is in an HDFS path like

/user/[user]/.sparkStaging/[YARN application ID]

and is deleted at the end of the application

.

So this is one option, though certainly abusing the staging directory.

A more general one might be to find where Dataset.persist(DISK_ONLY) writes.


On Fri, May 26, 2017 at 9:08 AM, Everett Anderson  wrote:

> Hi,
>
> I need to set a checkpoint directory as I'm starting to use GraphFrames.
> (Also, occasionally my regular DataFrame lineages get too long so it'd be
> nice to use checkpointing to squash the lineage.)
>
> I don't actually need this checkpointed data to live beyond the life of
> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and
> reading and writing non-transient data to S3.
>
> Two questions:
>
> 1. Is there a Spark --conf option to set the checkpoint directory? Somehow
> I couldn't find it, but surely it exists.
>
> 2. What's a good checkpoint directory for this use case? I imagine it'd be
> on HDFS and presumably in a YARN application-specific temporary path that
> gets cleaned up afterwards. Does anyone have a recommendation?
>
> Thanks!
>
> - Everett
>
>


user-unsubscr...@spark.apache.org

2017-05-30 Thread williamtellme123
 

 

From: Joel D [mailto:games2013@gmail.com] 
Sent: Monday, May 29, 2017 9:04 PM
To: user@spark.apache.org
Subject: Schema Evolution Parquet vs Avro

 

Hi,

 

We are trying to come up with the best storage format for handling schema 
changes in ingested data.

 

We noticed that both avro and parquet allows one to select based on column name 
instead of the data index/position of data. However, we are inclined towards 
parquet for better read performance since it's columnar and we will be 
selecting few columns instead of all. Data will be processed and saved to 
partitions on which we will have hive external tables.

 

Will parquet be able to handle the following:

- Column renaming from between data

- Column removal from between

- DataType change of existing column (int to bigint should be allowed, right?)

 

Please advise. 

 

Thanks,

Sam



Re: Message getting lost in Kafka + Spark Streaming

2017-05-30 Thread Cody Koeninger
First thing I noticed, you should be using a singleton kafka producer,
not recreating one every partition.  It's threadsafe.

On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek
 wrote:
> I am facing an issue related to spark streaming with kafka, my use case is as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it
> 2. On the basis of proccessed message, app will write proccessed message to
> different kafka topics
> for e.g. if messgese is harmonized then write to harmonized topic else
> unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write all
> the messges to output topics(this is expected behaviour) but sometimes it
> writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Message getting lost in Kafka + Spark Streaming

2017-05-30 Thread Vikash Pareek
I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from
kafka topic and process it
2. On the basis of proccessed message, app will write proccessed message to
different kafka topics
for e.g. if messgese is harmonized then write to harmonized topic else
unharmonized topic
 
the problem is that during the streaming somehow we are lossing some
messaged i.e all the incoming messages are not written to harmonized or
unharmonized topics.
for e.g. if app received 30 messages in one batch then sometime it write all
the messges to output topics(this is expected behaviour) but sometimes it
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*  val props = new Properties()
  props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
  props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
  props.put("group.id", properties.getProperty("group.id"))
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
  props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
  props.put("acks", "all");
  props.put("retries", "5");
  props.put("request.required.acks", "-1")
* 
Following is the piece of code where we are writing proccessed messges to
kafka:
*  val schemaRdd2 = finalHarmonizedDF.toJSON
 
  schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
 
partition.foreach { row =>
  if (debug) println(row.mkString)
  val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicHarmonized"),
null, row.toString())
  producer.send(keyedMessage)
 
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
  }
* 
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



help require in converting CassandraRDD to VertexRDD & EdgeRDD

2017-05-30 Thread Tania Khan
Hi,

My requirement is to read some edge and vertex data from different
cassandra tables. Now I want to pass them to spark graph as VertexRDD and
EdgeRDD and manipulate it further. Could anyone pls suggest me on how to
convert the below CassandraRow RDDs to vertexRDD and EDgeRDD so that I can
create the graph further.

 val vert= sc.cassandraTable("data","user1")
 val edd= sc.cassandraTable("data","user2")

or reading it as dataframe and then converting it to vertexRDD or EdgeRDD
to create the Graph. Or any better option is also welcome. Please suggest.

Thanks,
Tania