Re: Read Avro Data using Spark Streaming

2018-11-14 Thread chandan prakash
In case you are using old Spark Streaming and processing Avro data from
kafka, this might be useful:
https://www.linkedin.com/pulse/avro-data-processing-spark-streaming-kafka-chandan-prakash/

Regards,
Chandan

On Sat, Nov 3, 2018 at 9:04 AM Divya Narayan 
wrote:

> Hi,
>
> I produced avro data to kafka topic using schema registry and now I want
> to use spark streaming to read that data and do some computation in real
> time. Can some one please give a sample code for doing that . I couldn't
> find any working code online. I am using spark version 2.2.0 and
> spark-streaming-kafka-0-10_2.11.
>
> Thanks
> Divya
>


-- 
Chandan Prakash


Re: Watermarking without aggregation with Structured Streaming

2018-09-30 Thread chandan prakash
Interesting question.
I do not think without any aggregation operation/groupBy , watermark is
supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state
needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of
watermark happen only in case of stateful operators
(statefulOperators.scala)
Have not tried running code though and would like to know if someone can
shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay  wrote:

> Hello,
>
> I am trying to use watermarking without aggregation, to filter out records
> that are just too late, instead of appending them to the output. My
> understanding is that aggregation is required for `withWatermark` to have
> any effect. Is that correct?
>
> I am looking for something along the lines of
>
> ```
> df.withWatermark("ts", ...).filter(F.col("ts")  ```
>
> Is there any way to get the watermark value to achieve that?
>
> Thanks!
>


-- 
Chandan Prakash


Re: [Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

2018-09-30 Thread chandan prakash
Anyone who can clear doubts on the questions asked here   ?

Regards,
Chandan

On Sat, Aug 11, 2018 at 10:03 PM chandan prakash 
wrote:

> Hi All,
> I was going through this pull request about new CheckpointFileManager
> abstraction in structured streaming coming in 2.4 :
> https://issues.apache.org/jira/browse/SPARK-23966
> https://github.com/apache/spark/pull/21048
>
> I went through the code in detail and found it will indtroduce a very nice
> abstraction which is much cleaner and extensible for Direct Writes File
> System like S3 (in addition to current HDFS file system).
>
> *But I am unable to understand, is it really solving some problem in
> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>
> *My questions related to above statements in State Store code : *
>  *PR description*:: "Checkpoint files must be written atomically such
> that *no partial files are generated*.
> *QUESTION*: When are partial files generated in current code ?  I can see
> that data is first written to temp-delta file and then renamed to
> version.delta file. If something bad happens, the task will fail due to
> thrown exception and abort() will be called on store to close and delete
> tempDeltaFileStream . I think it is quite clean, what is the case that
> partial files might be generated ?
>
>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
> implementation does not have atomic rename*"
> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
> line takes into account about checking existing file if exists and then
> taking appropriate action which together makes the file renaming operation
> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
> Even if multiple executors try to write to the same version.delta file,
> only 1st of them will succeed, the second one will see the file exists and
> will delete its temp-delta file. Looks good .
>
> Anything I am missing here?
> Really curious to know which corner cases we are trying to solve by this
> new pull request ?
>
> Regards,
> Chandan
>
>
>
>
>

-- 
Chandan Prakash


[Structured Streaming SPARK-23966] Why non-atomic rename is problem in State Store ?

2018-08-11 Thread chandan prakash
Hi All,
I was going through this pull request about new CheckpointFileManager
abstraction in structured streaming coming in 2.4 :
https://issues.apache.org/jira/browse/SPARK-23966
https://github.com/apache/spark/pull/21048

I went through the code in detail and found it will indtroduce a very nice
abstraction which is much cleaner and extensible for Direct Writes File
System like S3 (in addition to current HDFS file system).

*But I am unable to understand, is it really solving some problem in
exsisting State Store code which is currently  existing in Spark 2.3 ? *

*My questions related to above statements in State Store code : *
 *PR description*:: "Checkpoint files must be written atomically such that *no
partial files are generated*.
*QUESTION*: When are partial files generated in current code ?  I can see
that data is first written to temp-delta file and then renamed to
version.delta file. If something bad happens, the task will fail due to
thrown exception and abort() will be called on store to close and delete
tempDeltaFileStream . I think it is quite clean, what is the case that
partial files might be generated ?

 *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
implementation does not have atomic rename*"
*QUESTION*:  Hdfs filesystem rename operation is atomic, I think above line
takes into account about checking existing file if exists and then taking
appropriate action which together makes the file renaming operation
multi-steps and hence non-atomic. But why this behaviour is incorrect ?
Even if multiple executors try to write to the same version.delta file,
only 1st of them will succeed, the second one will see the file exists and
will delete its temp-delta file. Looks good .

Anything I am missing here?
Really curious to know which corner cases we are trying to solve by this
new pull request ?

Regards,
Chandan


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread chandan prakash
Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for
the sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because
write will happen to a sink per record basis (after deciding a record
belongs to which particular sink), where as in the current implementation
all data under a RDD partition gets committed to the sink atomically in one
go. Please correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan  wrote:

> Yes ForeachWriter [1] could be an option If you want to write to different
> sinks. You can put your custom logic to split the data into different sinks.
>
> The drawback here is that you cannot plugin existing sinks like Kafka and
> you need to write the custom logic yourself and you cannot scale the
> partitions for the sinks independently.
>
> [1]
> https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html
>
> From: chandan prakash 
> Date: Thursday, July 12, 2018 at 2:38 AM
> To: Tathagata Das , "ymaha...@snappydata.io"
> , "priy...@asperasoft.com" ,
> "user @spark" 
> Subject: Re: [Structured Streaming] Avoiding multiple streaming queries
>
> Hi,
> Did anyone of you thought  about writing a custom foreach sink writer
> which can decided which record should go to which sink (based on some
> marker in record, which we can possibly annotate during transformation) and
> then accordingly write to specific sink.
> This will mean that:
> 1. every custom sink writer will have connections to as many sinks as many
> there are types of sink where records can go.
> 2.  every record will be read once in the single query but can be written
> to multiple sinks
>
> Do you guys see any drawback in this approach ?
> One drawback off course there is that sink is supposed to write the
> records as they are but we are inducing some intelligence here in the sink.
> Apart from that any other issues do you see with this approach?
>
> Regards,
> Chandan
>
>
> On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
> wrote:
>
>> Of course, you can write to multiple Kafka topics from a single query. If
>> your dataframe that you want to write has a column named "topic" (along
>> with "key", and "value" columns), it will write the contents of a row to
>> the topic in that row. This automatically works. So the only thing you need
>> to figure out is how to generate the value of that column.
>>
>> This is documented -
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
>>
>> Or am i misunderstanding the problem?
>>
>> TD
>>
>>
>>
>>
>> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
>> wrote:
>>
>>> I had a similar issue and i think that’s where the structured streaming
>>> design lacks.
>>> Seems like Question#2 in your email is a viable workaround for you.
>>>
>>> In my case, I have a custom Sink backed by an efficient in-memory column
>>> store suited for fast ingestion.
>>>
>>> I have a Kafka stream coming from one topic, and I need to classify the
>>> stream based on schema.
>>> For example, a Kafka topic can have three different types of schema
>>> messages and I would like to ingest into the three different column
>>> tables(having different schema) using my custom Sink implementation.
>>>
>>> Right now only(?) option I have is to create three streaming queries
>>> reading the same topic and ingesting to respective column tables using
>>> their Sink implementations.
>>> These three streaming queries create underlying three
>>> IncrementalExecutions and three KafkaSources, and three queries reading the
>>> same data from the same Kafka topic.
>>> Even with CachedKafkaConsumers at partition level, this is not an
>>> efficient way to handle a simple streaming use case.
>>>
>>> One workaround to overcome this limitation is to have same schema for
>>> all the messages in a Kafka partition, unfortunately this is not in our
>>> control and customers cannot change it due to their dependencies on other
>>> subsystems.
>>>
>>> Thanks,
>>> http://www.snappydata.io/blog <http://snappydata.io>
>>>
>>> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
>>> priy...@asperasoft.com> wrote:
>&

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-12 Thread chandan prakash
Hi,
Did anyone of you thought  about writing a custom foreach sink writer which
can decided which record should go to which sink (based on some marker in
record, which we can possibly annotate during transformation) and then
accordingly write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written
to multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records
as they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
wrote:

> Of course, you can write to multiple Kafka topics from a single query. If
> your dataframe that you want to write has a column named "topic" (along
> with "key", and "value" columns), it will write the contents of a row to
> the topic in that row. This automatically works. So the only thing you need
> to figure out is how to generate the value of that column.
>
> This is documented -
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
>
> Or am i misunderstanding the problem?
>
> TD
>
>
>
>
> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
> wrote:
>
>> I had a similar issue and i think that’s where the structured streaming
>> design lacks.
>> Seems like Question#2 in your email is a viable workaround for you.
>>
>> In my case, I have a custom Sink backed by an efficient in-memory column
>> store suited for fast ingestion.
>>
>> I have a Kafka stream coming from one topic, and I need to classify the
>> stream based on schema.
>> For example, a Kafka topic can have three different types of schema
>> messages and I would like to ingest into the three different column
>> tables(having different schema) using my custom Sink implementation.
>>
>> Right now only(?) option I have is to create three streaming queries
>> reading the same topic and ingesting to respective column tables using
>> their Sink implementations.
>> These three streaming queries create underlying three
>> IncrementalExecutions and three KafkaSources, and three queries reading the
>> same data from the same Kafka topic.
>> Even with CachedKafkaConsumers at partition level, this is not an
>> efficient way to handle a simple streaming use case.
>>
>> One workaround to overcome this limitation is to have same schema for all
>> the messages in a Kafka partition, unfortunately this is not in our control
>> and customers cannot change it due to their dependencies on other
>> subsystems.
>>
>> Thanks,
>> http://www.snappydata.io/blog <http://snappydata.io>
>>
>> On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I have a structured streaming query which sinks to Kafka.  This query
>>> has a complex aggregation logic.
>>>
>>>
>>> I would like to sink the output DF of this query to
>>> multiple Kafka topics each partitioned on a different ‘key’ column.  I
>>> don’t want to have multiple Kafka sinks for each of the
>>> different Kafka topics because that would mean running multiple streaming
>>> queries - one for each Kafka topic, especially since my aggregation logic
>>> is complex.
>>>
>>>
>>> Questions:
>>>
>>> 1.  Is there a way to output the results of a structured streaming query
>>> to multiple Kafka topics each with a different key column but without
>>> having to execute multiple streaming queries?
>>>
>>>
>>> 2.  If not,  would it be efficient to cascade the multiple queries such
>>> that the first query does the complex aggregation and writes output
>>> to Kafka and then the other queries just read the output of the first query
>>> and write their topics to Kafka thus avoiding doing the complex aggregation
>>> again?
>>>
>>>
>>> Thanks in advance for any help.
>>>
>>>
>>> Priyank
>>>
>>>
>>>
>>
>

-- 
Chandan Prakash


Re: Emit Custom metrics in Spark Structured Streaming job

2018-07-10 Thread chandan prakash
Hi Subramanian,
Did you find any solution for this ?
I am looking for something similar too.

Regards,
Chandan

On Wed, Jun 27, 2018 at 9:47 AM subramgr 
wrote:

> I am planning to send these metrics to our KairosDB. Let me know if there
> are
> any examples that I can take a look
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash


Re: Automatic Json Schema inference using Structured Streaming

2018-07-09 Thread chandan prakash
Hi Swetha,
I also had the same requirement reading from json from kafka and writing
back to parquet format.
I did a work around :

   1. Inferred the schema using the batch api by reading first few rows
   2. started streaming using the inferred schema in step1

*Limitation*: Will not work if you schema changes on the go for later
records. Will have to restart the streaming.


*Sample Code:*
 //start the stream
def start = {
//check and get latest kafka offset from checkpoint if exists
val startingOffset:String= getKafkaOffset(offsetDirHdfsPath)

//batch: infer schema from kafka one time during start
val batchDf=
spark.read.format("kafka").option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
//.option("startingOffsets",
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest
.option("startingOffsets",startingOffset)
   .load().limit(2).select($"value".as[String])
val batchJson= spark.read.json(batchDf)
batchJson.printSchema()//print to see schema
val schema = batchJson.schema

//streaming: create datastream from Kafka topics
val inputDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("startingOffsets",
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest
.option("startingOffsets",startingOffset)
.load()

// convert datastream into a datasets and convert the stream into
multiple rows by applying appropriate schema
val ds= inputDf.selectExpr("CAST (value as STRING)")
val dataSet =
ds.select(from_json($"value",schema).as("data")).select("data.*")
var uploadToS3 = dataSet
.writeStream
.format("parquet")
.option("path", outputPath)
.option("checkpointLocation", checkpointDir)
.start()
}

Regards,
Chandan

On Thu, Jul 5, 2018 at 12:38 PM SRK  wrote:

> Hi,
>
> Is there a way that Automatic Json Schema inference can be done using
> Structured Streaming?  I do not want to supply a predefined schema and bind
> it.
>
> With Spark Kafka Direct I could do spark.read.json(). I see that this is
> not
> supported in Structured Streaming.
>
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash


Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-09 Thread chandan prakash
Thanks Amiya/TD for responding.

@TD,
Thanks for letting us know about this new foreachBatch api, this handle of
per batch dataframe should be useful in many cases.

@Amiya,
The input source will be read twice, entire dag computation will be done
twice. Not limitation but resource utilisation and performance.

Regards,
Chandan



On Fri, Jul 6, 2018 at 2:42 PM Amiya Mishra 
wrote:

> Hi Tathagata,
>
> Is there any limitation of below code while writing to multiple file ?
>
> val inputdf:DataFrame =
>
> sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
> query1 =
>
> inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
> query2 =
>
> inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
> sparkSession.streams.awaitAnyTermination()
>
>
> And what will be the release date of spark 2.4.0 ?
>
> Thanks
> Amiya
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash


structured streaming: how to keep counter of error records in log running streaming application

2018-07-05 Thread chandan prakash
Hi,
I am writing a structured streaming application, where I process records
post some validation (lets say , not null).
Want to keep a counter of invalid records in the long running streaming
application while other valid records get processed.
How can I achieve it ?

First thought was using LongAccumulator but it seems like it is per batch
and not for the life of the application.

Any other way or workaround to achieve this, please share.
Thanks in advance.


Regards,
-- 
Chandan Prakash


Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

2018-07-05 Thread chandan prakash
Hi Amiya/Jürgen,
Did you get any lead on this ?
I want to process records post some validation.
Correct records should go in sink1 and incorrect records should go in sink2.
How to achieve this in single stream ?

Regards,
Chandan

On Wed, Jun 13, 2018 at 2:30 PM Amiya Mishra 
wrote:

> Hi Jürgen,
>
> Have you found any solution or workaround for multiple sinks from single
> source as we cannot process multiple sinks at a time ?
>
> As i also has a scenario in ETL where we are using clone component having
> multiple sinks with single input stream dataframe.
>
> Can you keep posting once you have any solution.
>
> Thanks
> Amiya
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash


Spark Streaming logging on Yarn : issue with rolling in yarn-client mode for driver log

2018-03-07 Thread chandan prakash
Hi All,
I am running my spark streaming in yarn-client mode.
I want to enable rolling and aggregation  in node manager container.
I am using configs as suggested in spark doc
<https://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application>:
${spark.yarn.app.container.log.dir}/spark.log  in log4j.properties

Also for Aggregation on yarn I have enabled these properties :
spark.yarn.rolledLog.includePattern=spark*
yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds=3600
 on spark and yarn side respectively.

At executors, my logs are getting rolled up and aggregated after every 1
hour as expected.
*But the issue is:*
 for driver,  in yarn-client mode, ${spark.yarn.app.container.log.dir} value
is not available when driver starts and so for driver ,so I am not able to
see driver logs in yarn app container directory.
My restrictions are:
1. want to use yarn-client mode only
2. want to enable logging in yarn container only so that it is aggregated
and backed up by yarn every hour to hdfs/s3

*How can I get a workaround this to enable driver logs rolling and
aggregation as well?*

Any pointers will be helpful.
thanks in advance.

-- 
Chandan Prakash


Re: SparkR test script issue: unable to run run-tests.h on spark 2.2

2018-02-14 Thread chandan prakash
Thanks a lot Hyukjin & Felix.
It was helpful.
Going to older version worked.

Regards,
Chandan

On Wed, Feb 14, 2018 at 3:28 PM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Yes it is issue with the newer release of testthat.
>
> To workaround could you install an earlier version with devtools? will
> follow up for a fix.
>
> _
> From: Hyukjin Kwon <gurwls...@gmail.com>
> Sent: Wednesday, February 14, 2018 6:49 PM
> Subject: Re: SparkR test script issue: unable to run run-tests.h on spark
> 2.2
> To: chandan prakash <chandanbaran...@gmail.com>
> Cc: user @spark <user@spark.apache.org>
>
>
>
> From a very quick look, I think testthat version issue with SparkR.
>
> I had to fix that version to 1.x before in AppVeyor. There are few details
> in https://github.com/apache/spark/pull/20003
>
> Can you check and lower testthat version?
>
>
> On 14 Feb 2018 6:09 pm, "chandan prakash" <chandanbaran...@gmail.com>
> wrote:
>
>> Hi All,
>> I am trying to run test script of R under ./R/run-tests.sh but hitting
>> same ERROR everytime.
>> I tried running on mac as well as centos machine, same issue coming up.
>> I am using spark 2.2 (branch-2.2)
>> I followed from apache doc and followed the steps:
>> 1. installed R
>> 2. installed packages like testthat as mentioned in doc
>> 3. run run-tests.h
>>
>>
>> Every time I am getting this error line:
>>
>> Error in get(name, envir = asNamespace(pkg), inherits = FALSE) :
>>   object 'run_tests' not found
>> Calls: ::: -> get
>> Execution halted
>>
>>
>> Any Help?
>>
>> --
>> Chandan Prakash
>>
>>
>
>


-- 
Chandan Prakash


SparkR test script issue: unable to run run-tests.h on spark 2.2

2018-02-14 Thread chandan prakash
Hi All,
I am trying to run test script of R under ./R/run-tests.sh but hitting same
ERROR everytime.
I tried running on mac as well as centos machine, same issue coming up.
I am using spark 2.2 (branch-2.2)
I followed from apache doc and followed the steps:
1. installed R
2. installed packages like testthat as mentioned in doc
3. run run-tests.h


Every time I am getting this error line:

Error in get(name, envir = asNamespace(pkg), inherits = FALSE) :
  object 'run_tests' not found
Calls: ::: -> get
Execution halted


Any Help?

-- 
Chandan Prakash


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-19 Thread chandan prakash
Ohh that explains the reason.
My use case does not need state management.
So i guess i am better off without checkpointing.
Thank you for clarification.

Regards,
Chandan

On Sat, Aug 20, 2016 at 8:24 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Checkpointing is required to be turned on in certain situations (e.g.
> updateStateByKey), but you're certainly not required to rely on it for
> fault tolerance.  I try not to.
>
> On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Thanks Cody for the pointer.
>>
>> I am able to do this now. Not using checkpointing. Rather storing offsets
>> in zookeeper for fault tolerance.
>> Spark Config changes now getting reflected in code deployment.
>> *Using this api :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
>> messageHandler)*
>> *instead of :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder](ssc, kafkaParams, topicsSet)*
>>
>> *One Quick question :
>> *What is need of checkpointing if we can achieve both fault tolerance and 
>> application code/config changes  without checkpointing? Is there anything 
>> else which checkpointing gives? I might be missing something.
>>
>>
>> Regards,
>> Chandan
>>
>>
>> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Yeah the solutions are outlined in the doc link.  Or just don't rely on
>>> checkpoints
>>> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com>
>>> wrote:
>>>
>>>> Yes,
>>>>  i looked into the source code implementation.  sparkConf is serialized
>>>> and saved during checkpointing and re-created from the checkpoint directory
>>>> at time of restart. So any sparkConf parameter which you load from
>>>> application.config and set in sparkConf object in code cannot be changed
>>>> and reflected with checkpointing.  :(
>>>>
>>>> Is there is any work around of reading changed sparkConf parameter
>>>> value with using checkpoiting?
>>>> p.s. i am not adding new parameter, i am just changing values of some
>>>> existing sparkConf param.
>>>>
>>>> This is a common case and there must be some solution for this.
>>>>
>>>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Checkpointing is not kafka-specific.  It encompasses metadata about
>>>>> the application.  You can't re-use a checkpoint if your application has
>>>>> changed.
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>>>> ide.html#upgrading-application-code
>>>>>
>>>>>
>>>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>>>> chandanbaran...@gmail.com> wrote:
>>>>>
>>>>>> Is it possible that i use checkpoint directory to restart streaming
>>>>>> but with modified parameter value in config file (e.g.  username/password
>>>>>> for db connection)  ?
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Regards,
>>>>>> Chandan
>>>>>>
>>>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>>>>>> chandanbaran...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am using direct kafka with checkpointing of offsets same as :
>>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>>>>>> src/main/scala/example/IdempotentExample.scala
>>>>>>>
>>>>>>> I need to change some parameters like db connection params :
>>>>>>> username/password for db connection .
>>>>>>> I stopped streaming gracefully ,changed parameters in config file
>>>>>>> and restarted streaming.
>>>>>>> *Issue : changed parameters  username/password are not being
>>>>>>> considered.*
>>>>>>>
>>>>>>> *Question* :
>>>>>>> As per my understanding , Checkpointing should only save offsets of
>>>>>>> kafka partitions and not the credentials of the db connection.
>>>>>>> Why its picking old db connection params ?
>>>>>>>
>>>>>>> I am declaring params in main method and not in setUpSsc(0 method.
>>>>>>> My code is identical to that in the above program link  as below:
>>>>>>> val jdbcDriver = conf.getString("jdbc.driver")
>>>>>>> val jdbcUrl = conf.getString("jdbc.url")
>>>>>>> *val jdbcUser = conf.getString("jdbc.user")*
>>>>>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>>>>>> // while the job doesn't strictly need checkpointing,
>>>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case
>>>>>>> of failure
>>>>>>> val checkpointDir = conf.getString("checkpointDir")
>>>>>>> val ssc = StreamingContext.getOrCreate(
>>>>>>> checkpointDir,
>>>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>>>>>> *jdbcPassword*, checkpointDir) _
>>>>>>> )
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Chandan Prakash
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Chandan Prakash
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Chandan Prakash
>>>>
>>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-19 Thread chandan prakash
Thanks Cody for the pointer.

I am able to do this now. Not using checkpointing. Rather storing offsets
in zookeeper for fault tolerance.
Spark Config changes now getting reflected in code deployment.
*Using this api :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
messageHandler)*
*instead of :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)*

*One Quick question :
*What is need of checkpointing if we can achieve both fault tolerance
and application code/config changes  without checkpointing? Is there
anything else which checkpointing gives? I might be missing something.


Regards,
Chandan


On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yeah the solutions are outlined in the doc link.  Or just don't rely on
> checkpoints
> On Aug 18, 2016 8:53 AM, "chandan prakash" <chandanbaran...@gmail.com>
> wrote:
>
>> Yes,
>>  i looked into the source code implementation.  sparkConf is serialized
>> and saved during checkpointing and re-created from the checkpoint directory
>> at time of restart. So any sparkConf parameter which you load from
>> application.config and set in sparkConf object in code cannot be changed
>> and reflected with checkpointing.  :(
>>
>> Is there is any work around of reading changed sparkConf parameter value
>> with using checkpoiting?
>> p.s. i am not adding new parameter, i am just changing values of some
>> existing sparkConf param.
>>
>> This is a common case and there must be some solution for this.
>>
>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Checkpointing is not kafka-specific.  It encompasses metadata about the
>>> application.  You can't re-use a checkpoint if your application has changed.
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>> ide.html#upgrading-application-code
>>>
>>>
>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>> chandanbaran...@gmail.com> wrote:
>>>
>>>> Is it possible that i use checkpoint directory to restart streaming but
>>>> with modified parameter value in config file (e.g.  username/password for
>>>> db connection)  ?
>>>> Thanks in advance.
>>>>
>>>> Regards,
>>>> Chandan
>>>>
>>>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>>>> chandanbaran...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am using direct kafka with checkpointing of offsets same as :
>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>>>> src/main/scala/example/IdempotentExample.scala
>>>>>
>>>>> I need to change some parameters like db connection params :
>>>>> username/password for db connection .
>>>>> I stopped streaming gracefully ,changed parameters in config file and
>>>>> restarted streaming.
>>>>> *Issue : changed parameters  username/password are not being
>>>>> considered.*
>>>>>
>>>>> *Question* :
>>>>> As per my understanding , Checkpointing should only save offsets of
>>>>> kafka partitions and not the credentials of the db connection.
>>>>> Why its picking old db connection params ?
>>>>>
>>>>> I am declaring params in main method and not in setUpSsc(0 method.
>>>>> My code is identical to that in the above program link  as below:
>>>>> val jdbcDriver = conf.getString("jdbc.driver")
>>>>> val jdbcUrl = conf.getString("jdbc.url")
>>>>> *val jdbcUser = conf.getString("jdbc.user")*
>>>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>>>> // while the job doesn't strictly need checkpointing,
>>>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>>>> failure
>>>>> val checkpointDir = conf.getString("checkpointDir")
>>>>> val ssc = StreamingContext.getOrCreate(
>>>>> checkpointDir,
>>>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>>>> *jdbcPassword*, checkpointDir) _
>>>>> )
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Chandan Prakash
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Chandan Prakash
>>>>
>>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>


-- 
Chandan Prakash


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Yes,
 i looked into the source code implementation.  sparkConf is serialized and
saved during checkpointing and re-created from the checkpoint directory at
time of restart. So any sparkConf parameter which you load from
application.config and set in sparkConf object in code cannot be changed
and reflected with checkpointing.  :(

Is there is any work around of reading changed sparkConf parameter value
with using checkpoiting?
p.s. i am not adding new parameter, i am just changing values of some
existing sparkConf param.

This is a common case and there must be some solution for this.

On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Checkpointing is not kafka-specific.  It encompasses metadata about the
> application.  You can't re-use a checkpoint if your application has changed.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#upgrading-application-code
>
>
> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Is it possible that i use checkpoint directory to restart streaming but
>> with modified parameter value in config file (e.g.  username/password for
>> db connection)  ?
>> Thanks in advance.
>>
>> Regards,
>> Chandan
>>
>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>> chandanbaran...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using direct kafka with checkpointing of offsets same as :
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>> src/main/scala/example/IdempotentExample.scala
>>>
>>> I need to change some parameters like db connection params :
>>> username/password for db connection .
>>> I stopped streaming gracefully ,changed parameters in config file and
>>> restarted streaming.
>>> *Issue : changed parameters  username/password are not being considered.*
>>>
>>> *Question* :
>>> As per my understanding , Checkpointing should only save offsets of
>>> kafka partitions and not the credentials of the db connection.
>>> Why its picking old db connection params ?
>>>
>>> I am declaring params in main method and not in setUpSsc(0 method.
>>> My code is identical to that in the above program link  as below:
>>> val jdbcDriver = conf.getString("jdbc.driver")
>>> val jdbcUrl = conf.getString("jdbc.url")
>>> *val jdbcUser = conf.getString("jdbc.user")*
>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>> // while the job doesn't strictly need checkpointing,
>>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>>> failure
>>> val checkpointDir = conf.getString("checkpointDir")
>>> val ssc = StreamingContext.getOrCreate(
>>> checkpointDir,
>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>> *jdbcPassword*, checkpointDir) _
>>> )
>>>
>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Is it possible that i use checkpoint directory to restart streaming but
with modified parameter value in config file (e.g.  username/password for
db connection)  ?
Thanks in advance.

Regards,
Chandan

On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <chandanbaran...@gmail.com>
wrote:

> Hi,
> I am using direct kafka with checkpointing of offsets same as :
> https://github.com/koeninger/kafka-exactly-once/blob/
> master/src/main/scala/example/IdempotentExample.scala
>
> I need to change some parameters like db connection params :
> username/password for db connection .
> I stopped streaming gracefully ,changed parameters in config file and
> restarted streaming.
> *Issue : changed parameters  username/password are not being considered.*
>
> *Question* :
> As per my understanding , Checkpointing should only save offsets of kafka
> partitions and not the credentials of the db connection.
> Why its picking old db connection params ?
>
> I am declaring params in main method and not in setUpSsc(0 method.
> My code is identical to that in the above program link  as below:
> val jdbcDriver = conf.getString("jdbc.driver")
> val jdbcUrl = conf.getString("jdbc.url")
> *val jdbcUser = conf.getString("jdbc.user")*
> * val jdbcPassword = conf.getString("jdbc.password")*
> // while the job doesn't strictly need checkpointing,
> // we'll checkpoint to avoid replaying the whole kafka log in case of
> failure
> val checkpointDir = conf.getString("checkpointDir")
> val ssc = StreamingContext.getOrCreate(
> checkpointDir,
> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
> *jdbcPassword*, checkpointDir) _
> )
>
>
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-18 Thread chandan prakash
Hi,
I am using direct kafka with checkpointing of offsets same as :
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala

I need to change some parameters like db connection params :
username/password for db connection .
I stopped streaming gracefully ,changed parameters in config file and
restarted streaming.
*Issue : changed parameters  username/password are not being considered.*

*Question* :
As per my understanding , Checkpointing should only save offsets of kafka
partitions and not the credentials of the db connection.
Why its picking old db connection params ?

I am declaring params in main method and not in setUpSsc(0 method.
My code is identical to that in the above program link  as below:
val jdbcDriver = conf.getString("jdbc.driver")
val jdbcUrl = conf.getString("jdbc.url")
*val jdbcUser = conf.getString("jdbc.user")*
* val jdbcPassword = conf.getString("jdbc.password")*
// while the job doesn't strictly need checkpointing,
// we'll checkpoint to avoid replaying the whole kafka log in case of
failure
val checkpointDir = conf.getString("checkpointDir")
val ssc = StreamingContext.getOrCreate(
checkpointDir,
setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
*jdbcPassword*, checkpointDir) _
)



-- 
Chandan Prakash


Re: JavaStreamingContext.stop() hangs

2016-07-01 Thread chandan prakash
http://why-not-learn-something.blogspot.in/2016/05/apache-spark-streaming-how-to-do.html

On Fri, Jul 1, 2016 at 1:42 PM, manoop <sudhan...@umalkar.com> wrote:

> I have a Spark job and I just want to stop it on some condition. Once the
> condition is met, I am calling JavaStreamingContext.stop(), but it just
> hangs. Does not move on to the next line, which is just a debug line. I
> expect it to come out.
>
> I already tried different variants of stop, that is, passing true to stop
> the spark context, etc. but nothing is working out.
>
> Here's the sample code:
>
> LOGGER.debug("Stop? {}", stop);
> if (stop) {
> jssc.stop(false, true);
> LOGGER.debug("STOPPED!");
> }
>
> I am using Spark 1.5.2. Any help / pointers would be appreciated.
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/JavaStreamingContext-stop-hangs-tp27257.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Chandan Prakash


Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Resolved.
Used passing  parameters in sparkConf instead of passing to spark-submit
command : (still dont know why passing to spark-submit command did not work)

 sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC
-Dlog4j.configuration=file:log4j_RequestLogExecutor.properties ")




On Tue, May 24, 2016 at 10:24 PM, chandan prakash <chandanbaran...@gmail.com
> wrote:

> Any suggestion?
>
> On Mon, May 23, 2016 at 5:18 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Hi,
>> I am able to do logging for driver but not for executor.
>>
>> I am running spark streaming under mesos.
>> Want to do log4j logging separately for driver and executor.
>>
>> Used the below option in spark-submit command :
>>
>> --driver-java-options 
>> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
>>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
>> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
>> "
>>
>> Logging for driver at path mentioned as in
>> log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is
>> happening fine.
>> But for executor, there is no logging happening (shud be at
>> /tmp/requestLogExecutor.log as mentioned in 
>> log4j_RequestLogExecutor.properties
>> on executor machines)
>>
>> *Any suggestions how to get logging enabled for executor ?*
>>
>> TIA,
>> Chandan
>>
>> --
>> Chandan Prakash
>>
>>
>
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


Re: spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-24 Thread chandan prakash
Any suggestion?

On Mon, May 23, 2016 at 5:18 PM, chandan prakash <chandanbaran...@gmail.com>
wrote:

> Hi,
> I am able to do logging for driver but not for executor.
>
> I am running spark streaming under mesos.
> Want to do log4j logging separately for driver and executor.
>
> Used the below option in spark-submit command :
>
> --driver-java-options 
> "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
>  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
> /usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
> "
>
> Logging for driver at path mentioned as in
> log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is happening
> fine.
> But for executor, there is no logging happening (shud be at
> /tmp/requestLogExecutor.log as mentioned in 
> log4j_RequestLogExecutor.properties
> on executor machines)
>
> *Any suggestions how to get logging enabled for executor ?*
>
> TIA,
> Chandan
>
> --
> Chandan Prakash
>
>


-- 
Chandan Prakash


spark streaming: issue with logging with separate log4j properties files for driver and executor

2016-05-23 Thread chandan prakash
Hi,
I am able to do logging for driver but not for executor.

I am running spark streaming under mesos.
Want to do log4j logging separately for driver and executor.

Used the below option in spark-submit command :
--driver-java-options
"-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogDriver.properties"
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:
/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.properties
"

Logging for driver at path mentioned as in
log4j_RequestLogDriver.properties(/tmp/requestLogDriver.log) is happening
fine.
But for executor, there is no logging happening (shud be at
/tmp/requestLogExecutor.log as mentioned in log4j_RequestLogExecutor.properties
on executor machines)

*Any suggestions how to get logging enabled for executor ?*

TIA,
Chandan

-- 
Chandan Prakash


Re: Kafka partition increased while Spark Streaming is running

2016-05-13 Thread chandan prakash
Makes sense. thank you cody.

Regards,
Chandan

On Fri, May 13, 2016 at 8:10 PM, Cody Koeninger <c...@koeninger.org> wrote:

> No, I wouldn't expect it to, once the stream is defined (at least for
> the direct stream integration for kafka 0.8), the topicpartitions are
> fixed.
>
> My answer to any question about "but what if checkpoints don't let me
> do this" is always going to be "well, don't rely on checkpoints."
>
> If you want dynamic topicpartitions,
> https://issues.apache.org/jira/browse/SPARK-12177
>
>
> On Fri, May 13, 2016 at 4:24 AM, chandan prakash
> <chandanbaran...@gmail.com> wrote:
> > Follow up question :
> >
> >  If spark streaming is using checkpointing (/tmp/checkpointDir)  for
> > AtLeastOnce and  number of Topics or/and partitions has increased
> then
> >  will gracefully shutting down and restarting from checkpoint will
> consider
> > new topics or/and partitions ?
> >  If the answer is NO then how to start from the same checkpoint with new
> > partitions/topics included?
> >
> > Thanks,
> > Chandan
> >
> >
> > On Wed, Feb 24, 2016 at 9:30 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> That's correct, when you create a direct stream, you specify the
> >> topicpartitions you want to be a part of the stream (the other method
> for
> >> creating a direct stream is just a convenience wrapper).
> >>
> >> On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航 <yuhang.c...@foxmail.com> wrote:
> >>>
> >>> Here I use the 'KafkaUtils.createDirectStream' to integrate Kafka with
> >>> Spark Streaming. I submitted the app, then I changed (increased)
> Kafka's
> >>> partition number after it's running for a while. Then I check the input
> >>> offset with 'rdd.asInstanceOf[HasOffsetRanges].offsetRanges', seeing
> that
> >>> only the offset of the initial partitions are returned.
> >>>
> >>> Does this mean Spark Streaming's Kafka integration can't update its
> >>> parallelism when Kafka's partition number is changed?
> >>
> >>
> >
> >
> >
> > --
> > Chandan Prakash
> >
>



-- 
Chandan Prakash


Re: Kafka partition increased while Spark Streaming is running

2016-05-13 Thread chandan prakash
Follow up question :

 If spark streaming is using checkpointing (/tmp/checkpointDir)  for
AtLeastOnce and  number of Topics or/and partitions has increased  then

 will gracefully shutting down and restarting from checkpoint will consider
new topics or/and partitions ?
 If the answer is NO then how to start from the same checkpoint with new
partitions/topics included?

Thanks,
Chandan


On Wed, Feb 24, 2016 at 9:30 PM, Cody Koeninger <c...@koeninger.org> wrote:

> That's correct, when you create a direct stream, you specify the
> topicpartitions you want to be a part of the stream (the other method for
> creating a direct stream is just a convenience wrapper).
>
> On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航 <yuhang.c...@foxmail.com> wrote:
>
>> Here I use the *'KafkaUtils.createDirectStream'* to integrate Kafka with
>> Spark Streaming. I submitted the app, then I changed (increased) Kafka's
>> partition number after it's running for a while. Then I check the input
>> offset with '*rdd.asInstanceOf[HasOffsetRanges].offsetRanges*', seeing
>> that only the offset of the initial partitions are returned.
>>
>> Does this mean Spark Streaming's Kafka integration can't update its
>> parallelism when Kafka's partition number is changed?
>>
>
>


-- 
Chandan Prakash


Re: Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread chandan prakash
Hey Cody,
What kind of problems exactly?
...data rate in kafka topics do vary significantly in my
caseout of total 50 topics(with 3 partitions each),half of the
topics generate data at very high speed say 1lakh/sec while other half
generate at very low rate say 1k/sec...
i have to process them together and insert into the same database
table...will it be better to have 2 different spark streaming
applications instead?
I dont have control over kafka topics and partitions, they are a central
system used by many other systems as well.

Regards,
Chandan

On Tue, May 10, 2016 at 8:01 PM, Cody Koeninger <c...@koeninger.org> wrote:

> maxRate is not used by the direct stream.
>
> Significant skew in rate across different partitions for the same
> topic is going to cause you all kinds of problems, not just with spark
> streaming.
>
> You can turn on backpressure, but you're better off addressing the
> underlying issue if you can.
>
> On Tue, May 10, 2016 at 8:08 AM, Soumitra Siddharth Johri
> <soumitra.siddha...@gmail.com> wrote:
> > Also look at back pressure enabled. Both of these can be used to limit
> the
> > rate
> >
> > Sent from my iPhone
> >
> > On May 10, 2016, at 8:02 AM, chandan prakash <chandanbaran...@gmail.com>
> > wrote:
> >
> > Hi,
> > I am using Spark Streaming with Direct kafka approach.
> > Want to limit number of event records coming in my batches.
> > Have question regarding  following 2 parameters :
> > 1. spark.streaming.receiver.maxRate
> > 2. spark.streaming.kafka.maxRatePerPartition
> >
> >
> > The documentation
> > (
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
> > ) says .
> > " spark.streaming.receiver.maxRate for receivers and
> > spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "
> >
> > Does it mean that  spark.streaming.receiver.maxRate  is valid only for
> > Receiver based approach only ?  (not the DirectKafkaApproach as well)
> >
> > If yes, then how do we control total number of records/sec in DirectKafka
> > ?.because spark.streaming.kafka.maxRatePerPartition  only controls
> max
> > rate per partition and not whole records. There might be many
> partitions
> > some with very fast rate and some with very slow rate.
> >
> > Regards,
> > Chandan
> >
> >
> >
> > --
> > Chandan Prakash
> >
>



-- 
Chandan Prakash


Spark Streaming : is spark.streaming.receiver.maxRate valid for DirectKafkaApproach

2016-05-10 Thread chandan prakash
Hi,
I am using Spark Streaming with Direct kafka approach.
Want to limit number of event records coming in my batches.
Have question regarding  following 2 parameters :
1. spark.streaming.receiver.maxRate
2. spark.streaming.kafka.maxRatePerPartition


The documentation (
http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
) says .
" spark.streaming.receiver.maxRate for receivers and
spark.streaming.kafka.maxRatePerPartition for Direct Kafka approach "

*Does it mean that  spark.streaming.receiver.maxRate  is valid only for
Receiver based approach only ?  (not the DirectKafkaApproach as well)*

*If yes, then how do we control total number of records/sec in DirectKafka
?.because spark.streaming.kafka.maxRatePerPartition  only controls max
rate per partition and not whole records. There might be many
partitions some with very fast rate and some with very slow rate.*

Regards,
Chandan



-- 
Chandan Prakash


Re: Is SPARK is the right choice for traditional OLAP query processing?

2015-11-08 Thread chandan prakash
Apache Drill is also a very good candidate for this.



On Mon, Nov 9, 2015 at 9:33 AM, Hitoshi Ozawa <ozaw...@worksap.co.jp> wrote:

> It depends on how much data needs to be processed. Data Warehouse with
> indexes is going to be faster when there is not much data. If you have big
> data, Spark Streaming and may be Spark SQL may interest you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-SPARK-is-the-right-choice-for-traditional-OLAP-query-processing-tp23921p25319.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Chandan Prakash