Re: Kubernetes security context when submitting job through k8s servers

2018-07-09 Thread trung kien
Thanks Li,

Inread through the ticket, be able to pass pod YAML file would be amazing.

Do you have any target date for production or incubator? I really want to
try out this feature.

On Mon, Jul 9, 2018 at 4:48 PM Yinan Li  wrote:

> Spark on k8s currently doesn't support specifying a custom SecurityContext
> of the driver/executor pods. This will be supported by the solution to
> https://issues.apache.org/jira/browse/SPARK-24434.
>
> On Mon, Jul 9, 2018 at 2:06 PM trung kien  wrote:
>
>> Dear all,
>>
>> Is there any way to includes security context (
>> https://kubernetes.io/docs/tasks/configure-pod-container/security-context/)
>> when submitting job through k8s servers?
>>
>> I'm trying to first spark jobs on Kubernetes through spark-submit:
>>
>> bin/spark-submit --master k8s://https://API_SERVERS --deploy-mode
>> cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf
>> spark.kubernetes.namespace=NAMESPACE --conf spark.executor.instances=3
>> --conf spark.kubernetes.container.image= --conf
>> spark.kubernetes.driver.pod.name=spark-pi-driver
>> local:///opt/spark/examples/jars/spark-examples_2.11-2.3.1.jar
>>
>> But the job was rejected because the pod (created by spark-submit)
>> doesn't have security context to run as my account (Our policy doesn't
>> allow us to runAsUser root)
>>
>> I check the code under KubernetesClientApplication.scala
>> <https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala>,
>> it doesn't seems to support security context by configuration.
>>
>> Is there any solution to get arround this issue? is there any patch that
>> support this?
>>
>> --
>> Thanks
>> Kien
>>
> --
Thanks
Kien


Kubernetes security context when submitting job through k8s servers

2018-07-09 Thread trung kien
Dear all,

Is there any way to includes security context (
https://kubernetes.io/docs/tasks/configure-pod-container/security-context/)
when submitting job through k8s servers?

I'm trying to first spark jobs on Kubernetes through spark-submit:

bin/spark-submit --master k8s://https://API_SERVERS --deploy-mode cluster
--name spark-pi --class org.apache.spark.examples.SparkPi --conf
spark.kubernetes.namespace=NAMESPACE --conf spark.executor.instances=3
--conf spark.kubernetes.container.image= --conf
spark.kubernetes.driver.pod.name=spark-pi-driver
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.1.jar

But the job was rejected because the pod (created by spark-submit) doesn't
have security context to run as my account (Our policy doesn't allow us to
runAsUser root)

I check the code under KubernetesClientApplication.scala
,
it doesn't seems to support security context by configuration.

Is there any solution to get arround this issue? is there any patch that
support this?

-- 
Thanks
Kien


Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Ah right i see.

Thank you very much.
On May 25, 2016 11:11 AM, "Cody Koeninger" <c...@koeninger.org> wrote:

> There's an overloaded createDirectStream method that takes a map from
> topicpartition to offset for the starting point of the stream.
>
> On Wed, May 25, 2016 at 9:59 AM, trung kien <kient...@gmail.com> wrote:
> > Thank Cody.
> >
> > I can build the mapping from time ->offset. However how can i pass this
> > offset to Spark Streaming job using that offset? ( using Direct Approach)
> >
> > On May 25, 2016 9:42 AM, "Cody Koeninger" <c...@koeninger.org> wrote:
> >>
> >> Kafka does not yet have meaningful time indexing, there's a kafka
> >> improvement proposal for it but it has gotten pushed back to at least
> >> 0.10.1
> >>
> >> If you want to do this kind of thing, you will need to maintain your
> >> own index from time to offset.
> >>
> >> On Wed, May 25, 2016 at 8:15 AM, trung kien <kient...@gmail.com> wrote:
> >> > Hi all,
> >> >
> >> > Is there any way to re-compute using Spark Streaming - Kafka Direct
> >> > Approach
> >> > from specific time?
> >> >
> >> > In some cases, I want to re-compute again from specific time (e.g
> >> > beginning
> >> > of day)? is that possible?
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks
> >> > Kien
>


Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Thank Cody.

I can build the mapping from time ->offset. However how can i pass this
offset to Spark Streaming job using that offset? ( using Direct Approach)
On May 25, 2016 9:42 AM, "Cody Koeninger" <c...@koeninger.org> wrote:

> Kafka does not yet have meaningful time indexing, there's a kafka
> improvement proposal for it but it has gotten pushed back to at least
> 0.10.1
>
> If you want to do this kind of thing, you will need to maintain your
> own index from time to offset.
>
> On Wed, May 25, 2016 at 8:15 AM, trung kien <kient...@gmail.com> wrote:
> > Hi all,
> >
> > Is there any way to re-compute using Spark Streaming - Kafka Direct
> Approach
> > from specific time?
> >
> > In some cases, I want to re-compute again from specific time (e.g
> beginning
> > of day)? is that possible?
> >
> >
> >
> > --
> > Thanks
> > Kien
>


Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Hi all,

Is there any way to re-compute using Spark Streaming - Kafka Direct
Approach from specific time?

In some cases, I want to re-compute again from specific time (e.g beginning
of day)? is that possible?



-- 
Thanks
Kien


Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread trung kien
Thanks all for actively sharing your experience.

@Chris: using something like Redis is something I am trying to figure out.
I have  a lots of transactions, so I couldn't trigger update event for
every single transaction.
I'm looking at Spark Streaming because it provide batch processing (e.g I
can update the cache every 5 seconds). In addition Spark can scale pretty
well and I don't have to worry about losing data.

Now having the cache with following information:
 * Date
 * BranchID
 * ProductID
 TotalQty
 TotalDollar

* is key, note that I have history data as well (byday).

Now I want to use zeppelin for querying again the cache (while the cache is
updating).
I don't need the Zeppelin update automatically (I can hit the run button
myself :) )
Just curious if parquet is the right solution for us?



On Sun, Mar 13, 2016 at 3:25 PM, Chris Miller <cmiller11...@gmail.com>
wrote:

> Cool! Thanks for sharing.
>
>
> --
> Chris Miller
>
> On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist <tsind...@gmail.com> wrote:
>
>> Below is a link to an example which Silvio Fiorito put together
>> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
>> I think the original thread was pack in early November 2015, subject: Real
>> time chart in Zeppelin, if you care to try to find it.
>>
>> https://gist.github.com/granturing/a09aed4a302a7367be92
>>
>> HTH.
>>
>> -Todd
>>
>> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller <cmiller11...@gmail.com>
>> wrote:
>>
>>> I'm pretty new to all of this stuff, so bare with me.
>>>
>>> Zeppelin isn't really intended for realtime dashboards as far as I know.
>>> Its reporting features (tables, graphs, etc.) are more for displaying the
>>> results from the output of something. As far as I know, there isn't really
>>> anything to "watch" a dataset and have updates pushed to the Zeppelin UI.
>>>
>>> As for Spark, unless you're doing a lot of processing that you didn't
>>> mention here, I don't think it's a good fit just for this.
>>>
>>> If it were me (just off the top of my head), I'd just build a simple web
>>> service that uses websockets to push updates to the client which could then
>>> be used to update graphs, tables, etc. The data itself -- that is, the
>>> accumulated totals -- you could store in something like Redis. When an
>>> order comes in, just add that quantity and price to the existing value and
>>> trigger your code to push out an updated value to any clients via the
>>> websocket. You could use something like a Redis pub/sub channel to trigger
>>> the web app to notify clients of an update.
>>>
>>> There are about 5 million other ways you could design this, but I would
>>> just keep it as simple as possible. I just threw one idea out...
>>>
>>> Good luck.
>>>
>>>
>>> --
>>> Chris Miller
>>>
>>> On Sat, Mar 12, 2016 at 6:58 PM, trung kien <kient...@gmail.com> wrote:
>>>
>>>> Thanks Chris and Mich for replying.
>>>>
>>>> Sorry for not explaining my problem clearly.  Yes i am talking about a
>>>> flexibke dashboard when mention Zeppelin.
>>>>
>>>> Here is the problem i am having:
>>>>
>>>> I am running a comercial website where we selle many products and we
>>>> have many branchs in many place. We have a lots of realtime transactions
>>>> and want to anaylyze it in realtime.
>>>>
>>>> We dont want every time doing analytics we have to aggregate every
>>>> single transactions ( each transaction have BranchID, ProductID, Qty,
>>>> Price). So, we maintain intermediate data which contains : BranchID,
>>>> ProducrID, totalQty, totalDollar
>>>>
>>>> Ideally, we have 2 tables:
>>>>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)
>>>>
>>>> And intermediate table Stats is just sum of every transaction group by
>>>> BranchID and ProductID( i am using Sparkstreaming to calculate this table
>>>> realtime)
>>>>
>>>> My thinking is that doing statistics ( realtime dashboard)  on Stats
>>>> table is much easier, this table is also not enough for maintain.
>>>>
>>>> I'm just wondering, whats the best way to store Stats table( a database
>>>> or parquet file?)
>>>> What exactly are you trying to do? Zeppelin is for interactive analysis
>>>> of a da

Re: Correct way to use spark streaming with apache zeppelin

2016-03-12 Thread trung kien
Thanks Chris and Mich for replying.

Sorry for not explaining my problem clearly.  Yes i am talking about a
flexibke dashboard when mention Zeppelin.

Here is the problem i am having:

I am running a comercial website where we selle many products and we have
many branchs in many place. We have a lots of realtime transactions and
want to anaylyze it in realtime.

We dont want every time doing analytics we have to aggregate every single
transactions ( each transaction have BranchID, ProductID, Qty, Price). So,
we maintain intermediate data which contains : BranchID, ProducrID,
totalQty, totalDollar

Ideally, we have 2 tables:
   Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)

And intermediate table Stats is just sum of every transaction group by
BranchID and ProductID( i am using Sparkstreaming to calculate this table
realtime)

My thinking is that doing statistics ( realtime dashboard)  on Stats table
is much easier, this table is also not enough for maintain.

I'm just wondering, whats the best way to store Stats table( a database or
parquet file?)
What exactly are you trying to do? Zeppelin is for interactive analysis of
a dataset. What do you mean "realtime analytics" -- do you mean build a
report or dashboard that automatically updates as new data comes in?


--
Chris Miller

On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote:

> Hi all,
>
> I've just viewed some Zeppenlin's videos. The intergration between
> Zeppenlin and Spark is really amazing and i want to use it for my
> application.
>
> In my app, i will have a Spark streaming app to do some basic realtime
> aggregation ( intermediate data). Then i want to use Zeppenlin to do some
> realtime analytics on the intermediate data.
>
> My question is what's the most efficient storage engine to store realtime
> intermediate data? Is parquet file somewhere is suitable?
>


Correct way to use spark streaming with apache zeppelin

2016-03-11 Thread trung kien
Hi all,

I've just viewed some Zeppenlin's videos. The intergration between
Zeppenlin and Spark is really amazing and i want to use it for my
application.

In my app, i will have a Spark streaming app to do some basic realtime
aggregation ( intermediate data). Then i want to use Zeppenlin to do some
realtime analytics on the intermediate data.

My question is what's the most efficient storage engine to store realtime
intermediate data? Is parquet file somewhere is suitable?


Re: RDD partition after calling mapToPair

2015-11-24 Thread trung kien
Thanks Cody for very useful information.

It's much more clear to me now. I had a lots of wrong assumptions.
On Nov 23, 2015 10:19 PM, "Cody Koeninger" <c...@koeninger.org> wrote:

> Partitioner is an optional field when defining an rdd.  KafkaRDD doesn't
> define one, so you can't really assume anything about the way it's
> partitioned, because spark doesn't know anything about the way it's
> partitioned.  If you want to rely on some property of how things were
> partitioned as they were being produced into kafka, you need to do
> foreachPartition or mapPartition yourself.  Otherwise, spark will do a
> shuffle for any operation that would ordinarily require a shuffle, even if
> keys are already in the "right" place.
>
> Regarding the assignment of cores to partitions, that's not really
> accurate.  Each kafka partition will correspond to a spark partition.  If
> you do an operation that shuffles, that relationship no longer holds true.
> Even if you're doing a straight map operation without a shuffle, you will
> probably get 1 executor core working on 1 partition, but there's no
> guarantee the scheduler will do that, and no guarantee it'll be the same
> core / partition relationship for the next batch.
>
>
> On Mon, Nov 23, 2015 at 9:01 AM, Thúy Hằng Lê <thuyhang...@gmail.com>
> wrote:
>
>> Thanks Cody,
>>
>> I still have concerns about this.
>> What's do you mean by saying Spark direct stream doesn't have a default
>> partitioner? Could you please help me to explain more?
>>
>> When i assign 20 cores to 20 Kafka partitions, I am expecting each core
>> will work on a partition. Is it correct?
>>
>> I'm still couldn't figure out how RDD will be partitioned after mapToPair
>> function. It would be great if you could brieftly explain ( or send me some
>> document, i couldnt find it) about how shuffle work on mapToPair function.
>>
>> Thank you very much.
>> On Nov 23, 2015 12:26 AM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Spark direct stream doesn't have a default partitioner.
>>>
>>> If you know that you want to do an operation on keys that are already
>>> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a
>>> shuffle.
>>>
>>> On Sat, Nov 21, 2015 at 11:46 AM, trung kien <kient...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am having problem of understanding how RDD will be partitioned after
>>>> calling mapToPair function.
>>>> Could anyone give me more information about parititoning in this
>>>> function?
>>>>
>>>> I have a simple application doing following job:
>>>>
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(...)
>>>>
>>>> JavaPairDStream<String, Double> stats = messages.mapToPair(JSON_DECODE)
>>>>
>>>> .reduceByKey(SUM);
>>>>
>>>> saveToDB(stats)
>>>>
>>>> I setup 2 workers (each dedicate 20 cores) for this job.
>>>> My kafka topic has 40 partitions (I want each core handle a partition),
>>>> and the messages send to queue are partitioned by the same key as mapToPair
>>>> function.
>>>> I'm using default Partitioner of both Kafka and Sprark.
>>>>
>>>> Ideally, I shouldn't see the data shuffle between cores in mapToPair
>>>> stage, right?
>>>> However, in my Spark UI, I see that the "Locality Level" for this stage
>>>> is "ANY", which means data need to be transfered.
>>>> Any comments on this?
>>>>
>>>> --
>>>> Thanks
>>>> Kien
>>>>
>>>
>>>
>


Re: Spark Streaming data checkpoint performance

2015-11-07 Thread trung kien
Hmm,

Seems it just do a trick.
Using this method, it's very hard to recovery from failure, since we don't
know which batch have been done.

I really want to maintain the whole running stats in memory to archive full
failure-tolerant.

I just wonder if the performance of data checkpoint is that bad? or I
misses something in my setup?

30 seconds for data checkpoint of 1M keys is too much for me.


On Sat, Nov 7, 2015 at 1:25 PM, Aniket Bhatnagar  wrote:

> It depends on the stats you are collecting. For example, if you just
> collecting counts, you can do away with updateStateByKey completely by
> doing insert or update operation on the data store after reduce. I.e.
>
> For each (key, batchCount)
>   if (key exists in dataStore)
> update count = count + batchCount for the key
>  else
> insert (key, batchCount)
>
> Thanks,
> Aniket
>
> On Sat, Nov 7, 2015 at 11:38 AM Thúy Hằng Lê 
> wrote:
>
>> Thanks Aniket,
>>
>> I want to store the state to an external storage but it should be in
>> later step I think.
>> Basically, I have to use updateStateByKey function to maintain the
>> running state (which requires checkpoint), and my bottleneck is now in data
>> checkpoint.
>>
>> My pseudo code is like below:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(
>> sparkConf,Durations.seconds(2));
>> jssc.checkpoint("spark-data/checkpoint");
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(...);
>> JavaPairDStream stats =
>> messages.mapToPair(parseJson)
>> .reduceByKey(REDUCE_STATS)
>> .updateStateByKey(RUNNING_STATS);
>>
>>JavaPairDStream newData =
>> stages.filter(NEW_STATS);
>>
>>newData.foreachRDD{
>>  rdd.forEachPartition{
>>//Store to external storage.
>>  }
>>   }
>>
>>   Without using updateStageByKey, I'm only have the stats of the last
>> micro-batch.
>>
>> Any advise on this?
>>
>>
>> 2015-11-07 11:35 GMT+07:00 Aniket Bhatnagar :
>>
>>> Can you try storing the state (word count) in an external key value
>>> store?
>>>
>>> On Sat, Nov 7, 2015, 8:40 AM Thúy Hằng Lê  wrote:
>>>
 Hi all,

 Anyone could help me on this. It's a bit urgent for me on this.
 I'm very confused and curious about Spark data checkpoint performance?
 Is there any detail implementation of checkpoint I can look into?
 Spark Streaming only take sub-second to process 20K messages/sec,
 however it take 25 seconds for checkpoint. Now my application have average
 30 seconds latency and keep increasingly.


 2015-11-06 11:11 GMT+07:00 Thúy Hằng Lê :

> Thankd all, it would be great to have this feature soon.
> Do you know what's the release plan for 1.6?
>
> In addition to this, I still have checkpoint performance problem
>
> My code is just simple like this:
> JavaStreamingContext jssc = new
> JavaStreamingContext(sparkConf,Durations.seconds(2));
> jssc.checkpoint("spark-data/checkpoint");
> JavaPairInputDStream messages =
> KafkaUtils.createDirectStream(...);
> JavaPairDStream stats =
> messages.mapToPair(parseJson)
> .reduceByKey(REDUCE_STATS)
> .updateStateByKey(RUNNING_STATS);
>
> stats.print()
>
>   Now I need to maintain about 800k keys, the stats here is only count
> number of occurence for key.
>   While running the cache dir is very small (about 50M), my question
> is:
>
>   1/ For regular micro-batch it takes about 800ms to finish, but every
> 10 seconds when data checkpoint is running
>   It took me 5 seconds to finish the same size micro-batch, why it's
> too high? what's kind of job in checkpoint?
>   why it's keep increasing?
>
>   2/ When I changes the data checkpoint interval like using:
>   stats.checkpoint(Durations.seconds(100)); //change to 100,
> defaults is 10
>
>   The checkpoint is keep increasing significantly first checkpoint is
> 10s, second is 30s, third is 70s ... and keep increasing :)
>   Why it's too high when increasing checkpoint interval?
>
> It seems that default interval works more stable.
>
> On Nov 4, 2015 9:08 PM, "Adrian Tanase"  wrote:
>
>> Nice! Thanks for sharing, I wasn’t aware of the new API.
>>
>> Left some comments on the JIRA and design doc.
>>
>> -adrian
>>
>> From: Shixiong Zhu
>> Date: Tuesday, November 3, 2015 at 3:32 AM
>> To: Thúy Hằng Lê
>> Cc: Adrian Tanase, "user@spark.apache.org"
>> Subject: Re: Spark Streaming data checkpoint performance
>>
>> "trackStateByKey" is about to be added in 1.6