Jupyter Notebook (Scala, kernel - Apache Toree) with Vegas, Graph not showing data

2019-02-01 Thread karan alang
Anybody used Vegas-viz for charting/graphics on Jupyter Spark/Scala
notebook ?

I'm looking for input on the following issue -

https://stackoverflow.com/questions/54473744/jupyter-notebook-scala-kernel-apache-toree-with-vegas-graph-not-showing-da

Pls. let me know.

thanks!


Re: Structured streaming from Kafka by timestamp

2019-02-01 Thread Tomas Bartalos
Hello,

sorry for my late answer.
You're right, what I'm doing is a one time query, not a structured
streaming. Probably it will be best to describe my use case:
I'd like to expose live data (via jdbc/odbc) residing in Kafka with the
power of spark's distributed sql engine. As jdbc server I use spark thrift
server.
Since timestamp pushdown is not possible :-(, this is a very cumbersome
task.
Let's say I want to inspect last 5 minutes of kafka. First I have to find
out offsetFrom per each partition that corresponds to now() - 5 minutes.
Then I can register a kafka table:

CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers
'server1,server2,...',

subscribe 'my_topic',

startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" :
49456213, "3" : 48400521}}');

Then I can issue queries against this table (Data in Kafka is stored in
Avro format but I've created custom genericUDF to deserialize the data).

select event.id as id, explode(event.picks) as picks from (

select from_avro(value) as event from ticket_kafka_x where timestamp >
from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")

) limit 100;


Whats even more irritating after few minutes I have to re-create this table
to reflect the last 5 minutes interval, otherwise the query performance
would suffer from increasing data to filter.

Colleague of mine was able to make direct queries with timestamp pushdown
in latest Hive.
How difficult is it to implement this feature in spark, could you lead me
to code where I could have a look ?

Thank you,


pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu 
napísal(a):

> Hey Tomas,
>
> From your description, you just ran a batch query rather than a Structured
> Streaming query. The Kafka data source doesn't support filter push down
> right now. But that's definitely doable. One workaround here is setting
> proper  "startingOffsets" and "endingOffsets" options when loading from
> Kafka.
>
> Best Regards,
> Ryan
>
>
> On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi 
> wrote:
>
>> Hi Tomas,
>>
>> As a general note don't fully understand your use-case. You've mentioned
>> structured streaming but your query is more like a one-time SQL statement.
>> Kafka doesn't support predicates how it's integrated with spark. What can
>> be done from spark perspective is to look for an offset for a specific
>> lowest timestamp and start the reading from there.
>>
>> BR,
>> G
>>
>>
>> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to read Kafka via spark structured streaming. I'm trying to
>>> read data within specific time range:
>>>
>>> select count(*) from kafka_table where timestamp > cast('2019-01-23
>>> 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP)
>>> ;
>>>
>>>
>>> The problem is that timestamp query is not pushed-down to Kafka, so
>>> Spark tries to read the whole topic from beginning.
>>>
>>>
>>> explain query:
>>>
>>> 
>>>
>>>  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 >
>>> 15351480)) && (timestamp#57 < 15352344))
>>>
>>>
>>> Scan
>>> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production],
>>> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)
>>> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
>>> *PushedFilters: []*, ReadSchema:
>>> struct>>
>>>
>>> Obviously the query takes forever to complete. Is there a solution to
>>> this ?
>>>
>>> I'm using kafka and kafka-client version 1.1.1
>>>
>>>
>>> BR,
>>>
>>> Tomas
>>>
>>


Re: Aws

2019-02-01 Thread Pedro Tuero
Hi Hiroyuki, thanks for the answer.

I found a solution for the cores per executor configuration:
I set this configuration to true:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
Probably it was true by default at version 5.16, but I didn't find when it
has changed.
In the same link, it says that dynamic allocation is true by default. I
thought it would do the trick but reading again I think it is related to
the number of executors rather than the number of cores.

But the jobs are still taking more than before.
Watching application history,  I see these differences:
For the same job, the same kind of instances types, default (aws managed)
configuration for executors, cores, and memory:
Instances:
6 r5.xlarge :  4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances * 4
cores).

With 5.16:
- 24 executors  (4 in each instance, including the one who also had the
driver).
- 4 cores each.
- 2.7  * 2 (Storage + on-heap storage) memory each.
- 1 executor per core, but at the same time  4 cores per executor (?).
- Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4)
- Total Elapsed Time: 6 minutes
With 5.20:
- 5 executors (1 in each instance, 0 in the instance with the driver).
- 4 cores each.
- 11.9  * 2 (Storage + on-heap storage) memory each.
- Total Mem  in executors per Instance : 23.8 (11.9 * 2 * 1)
- Total Elapsed Time: 8 minutes


I don't understand the configuration of 5.16, but it works better.
It seems that in 5.20, a full instance is wasted with the driver only,
while it could also contain an executor.


Regards,
Pedro.



l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata 
escribió:

> Hi, Pedro
>
>
> I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for
> performance tuning.
>
> Do you configure dynamic allocation ?
>
> FYI:
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> I've not tested it yet. I guess spark-submit needs to specify number of
> executors.
>
> Regards,
> Hiroyuki
>
> 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ:
>
>> Hi guys,
>> I use to run spark jobs in Aws emr.
>> Recently I switch from aws emr label  5.16 to 5.20 (which use Spark
>> 2.4.0).
>> I've noticed that a lot of steps are taking longer than before.
>> I think it is related to the automatic configuration of cores by executor.
>> In version 5.16, some executors toke more cores if the instance allows it.
>> Let say, if an instance had 8 cores and 40gb of ram, and ram configured
>> by executor was 10gb, then aws emr automatically assigned 2 cores by
>> executor.
>> Now in label 5.20, unless I configure the number of cores manually, only
>> one core is assigned per executor.
>>
>> I don't know if it is related to Spark 2.4.0 or if it is something
>> managed by aws...
>> Does anyone know if there is a way to automatically use more cores when
>> it is physically possible?
>>
>> Thanks,
>> Peter.
>>
>