Re: Process large JSON file without causing OOM

2017-11-13 Thread Sonal Goyal
If you are running Spark with local[*] as master, there will be a single
process whose memory will be controlled by --driver-memory command line
option to spark submit. Check

http://spark.apache.org/docs/latest/configuration.html

spark.driver.memory 1g Amount of memory to use for the driver process, i.e.
where SparkContext is initialized. (e.g. 1g, 2g).
*Note:* In client mode, this config must not be set through the
SparkConf directly
in your application, because the driver JVM has already started at that
point. Instead, please set this through the --driver-memory command line
option or in your default properties file.

Thanks,
Sonal
Nube Technologies 





On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan  wrote:

> Hi Joel,
>
> Here are the relevant snippets of my code and an OOM error thrown
> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
> even though I am running with -Xmx10G and 4G executor and driver memory as
> shown below.
>
> SparkConf sparkConf = new SparkConf()
> .setAppName("My Service")
> .setMaster("local[*]")
> .set("spark.ui.enabled", "true")
> .set("spark.executor.memory", "4G")
> .set("spark.driver.memory", "4G");
>
> sparkSessionBuilder = SparkSession.builder().config(
> sparkConf).enableHiveSupport();
>
> Dataset events = sparkSession.read()
> .format("json")
> .schema(inputConfig.getSchema())
> .load(inputFile.getPath());
>
> DataFrameWriter frameWriter = events.selectExpr(
> JavaConversions.asScalaBuffer(outputSchema.getColumns())) //
> select "data.customer AS `customer`", ...
> .write()
> .options(outputConfig.getProperties()) // compression=zlib
> .format("orc")
> 
> .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
> // partition by "customer"
> .save(outputUri.getPath());
>
>
> Here is the error log I get at runtime:
>
> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid3790.hprof ...
> Heap dump file created [62653841 bytes in 2.212 secs]
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing "kill -9 3790"...
>
>
> And here is the thread from the thread dump that caused OOM:
>
> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
> at java.lang.OutOfMemoryError.(OutOfMemoryError.java:48)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.
> getCompressedData(BlockDecompressorStream.java:123)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(
> BlockDecompressorStream.java:98)
> at org.apache.hadoop.io.compress.DecompressorStream.read(
> DecompressorStream.java:85)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>Local Variable: byte[]#3957
>Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
>Local Variable: org.apache.hadoop.io.Text#5
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> skipUtfByteOrderMark(LineRecordReader.java:144)
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> nextKeyValue(LineRecordReader.java:184)
>Local Variable: org.apache.hadoop.mapreduce.
> lib.input.LineRecordReader#1
> at org.apache.spark.sql.execution.datasources.
> RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>Local Variable: org.apache.spark.sql.execution.datasources.
> RecordReaderIterator#1
> at org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>Local Variable: org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader#1
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>Local Variable: scala.collection.Iterator$$anon$12#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
>Local Variable: org.apache.spark.sql.execution.datasources.
> FileScanRDD$$anon$1#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 

Re: Use of Accumulators

2017-11-13 Thread Holden Karau
So you want to set an accumulator to 1 after a transformation has fully
completed? Or what exactly do you want to do?

On Mon, Nov 13, 2017 at 9:47 PM vaquar khan  wrote:

> Confirmed ,you can use Accumulators :)
>
> Regards,
> Vaquar khan
>
> On Mon, Nov 13, 2017 at 10:58 AM, Kedarnath Dixit <
> kedarnath_di...@persistent.com> wrote:
>
>> Hi,
>>
>>
>> We need some way to toggle the flag of  a variable in transformation.
>>
>>
>> We are thinking to make use of spark  Accumulators for this purpose.
>>
>>
>> Can we use these as below:
>>
>>
>> Variables  -> Initial Value
>>
>>  Variable1 -> 0
>>
>>  Variable2 -> 0
>>
>>
>> In one of the transformations if we need to make Variable2's value to 1.
>> Can we achieve this using Accumulators? Please confirm.
>>
>>
>> Thanks!
>>
>>
>> With Regards,
>>
>> *~Kedar Dixit*
>>
>> kedarnath_di...@persistent.com | @kedarsdixit | M +91 90499 15588
>> <+91%2090499%2015588> | T +91 (20) 6703 4783 <+91%2020%206703%204783>
>>
>> *Persistent Systems | **Partners In Innovation** | www.persistent.com
>> *
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
> Greater Chicago
>
-- 
Twitter: https://twitter.com/holdenkarau


Re: Process large JSON file without causing OOM

2017-11-13 Thread vaquar khan
https://stackoverflow.com/questions/26562033/how-to-set-apache-spark-executor-memory

Regards,
Vaquar khan

On Mon, Nov 13, 2017 at 6:22 PM, Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>



-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


Re: Use of Accumulators

2017-11-13 Thread vaquar khan
Confirmed ,you can use Accumulators :)

Regards,
Vaquar khan

On Mon, Nov 13, 2017 at 10:58 AM, Kedarnath Dixit <
kedarnath_di...@persistent.com> wrote:

> Hi,
>
>
> We need some way to toggle the flag of  a variable in transformation.
>
>
> We are thinking to make use of spark  Accumulators for this purpose.
>
>
> Can we use these as below:
>
>
> Variables  -> Initial Value
>
>  Variable1 -> 0
>
>  Variable2 -> 0
>
>
> In one of the transformations if we need to make Variable2's value to 1.
> Can we achieve this using Accumulators? Please confirm.
>
>
> Thanks!
>
>
> With Regards,
>
> *~Kedar Dixit*
>
> kedarnath_di...@persistent.com | @kedarsdixit | M +91 90499 15588
> <+91%2090499%2015588> | T +91 (20) 6703 4783 <+91%2020%206703%204783>
>
> *Persistent Systems | **Partners In Innovation** | www.persistent.com
> *
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>



-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago


Re: Spark based Data Warehouse

2017-11-13 Thread lucas.g...@gmail.com
Hi Ashish, bear in mind that EMR has some additional tooling available that
smoothes out some S3 problems that you may / almost certainly will
encounter.

We are using Spark / S3 not on EMR and have encountered issues with file
consistency, you can deal with it but be aware it's additional technical
debt that you'll need to own.  We didn't want to own an HDFS cluster so we
consider it worthwhile.

Here are some additional resources:  The video is Steve Loughran talking
about S3.
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
https://www.youtube.com/watch?v=ND4L_zSDqF0

For the record we use S3 heavily but tend to drop our processed data into
databases so they can be more easily consumed by visualization tools.

Good luck!

Gary Lucas

On 13 November 2017 at 20:04, Affan Syed  wrote:

> Another option that we are trying internally is to uses Mesos for
> isolating different jobs or groups. Within a single group, using Livy to
> create different spark contexts also works.
>
> - Affan
>
> On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat  wrote:
>
>> Thanks Sky Yin. This really helps.
>>
>> On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:
>>
>> We are running Spark in AWS EMR as data warehouse. All data are in S3 and
>> metadata in Hive metastore.
>>
>> We have internal tools to creat juypter notebook on the dev cluster. I
>> guess you can use zeppelin instead, or Livy?
>>
>> We run genie as a job server for the prod cluster, so users have to
>> submit their queries through the genie. For better resource utilization, we
>> rely on Yarn dynamic allocation to balance the load of multiple
>> jobs/queries in Spark.
>>
>> Hope this helps.
>>
>> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat 
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> I was trying to understand if anyone here has tried a data warehouse
>>> solution using S3 and Spark SQL. Out of multiple possible options
>>> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
>>> our aggregates and processing requirements.
>>>
>>> If anyone has tried it out, would like to understand the following:
>>>
>>>1. Is Spark SQL and UDF, able to handle all the workloads?
>>>2. What user interface did you provide for data scientist, data
>>>engineers and analysts
>>>3. What are the challenges in running concurrent queries, by many
>>>users, over Spark SQL? Considering Spark still does not provide spill to
>>>disk, in many scenarios, are there frequent query failures when executing
>>>concurrent queries
>>>4. Are there any open source implementations, which provide
>>>something similar?
>>>
>>>
>>> Regards,
>>> Ashish
>>>
>>
>>
>


Re: Process large JSON file without causing OOM

2017-11-13 Thread Alec Swan
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown
in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
even though I am running with -Xmx10G and 4G executor and driver memory as
shown below.

SparkConf sparkConf = new SparkConf()
.setAppName("My Service")
.setMaster("local[*]")
.set("spark.ui.enabled", "true")
.set("spark.executor.memory", "4G")
.set("spark.driver.memory", "4G");

sparkSessionBuilder =
SparkSession.builder().config(sparkConf).enableHiveSupport();

Dataset events = sparkSession.read()
.format("json")
.schema(inputConfig.getSchema())
.load(inputFile.getPath());

DataFrameWriter frameWriter =
events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns()))
// select "data.customer AS `customer`", ...
.write()
.options(outputConfig.getProperties()) // compression=zlib
.format("orc")

.partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
// partition by "customer"
.save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:

"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.(OutOfMemoryError.java:48)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable:
org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable:
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable:
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

Re: Spark based Data Warehouse

2017-11-13 Thread Affan Syed
Another option that we are trying internally is to uses Mesos for isolating
different jobs or groups. Within a single group, using Livy to create
different spark contexts also works.

- Affan

On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat  wrote:

> Thanks Sky Yin. This really helps.
>
> On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:
>
> We are running Spark in AWS EMR as data warehouse. All data are in S3 and
> metadata in Hive metastore.
>
> We have internal tools to creat juypter notebook on the dev cluster. I
> guess you can use zeppelin instead, or Livy?
>
> We run genie as a job server for the prod cluster, so users have to submit
> their queries through the genie. For better resource utilization, we rely
> on Yarn dynamic allocation to balance the load of multiple jobs/queries in
> Spark.
>
> Hope this helps.
>
> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat  wrote:
>
>> Hello Everyone,
>>
>> I was trying to understand if anyone here has tried a data warehouse
>> solution using S3 and Spark SQL. Out of multiple possible options
>> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
>> our aggregates and processing requirements.
>>
>> If anyone has tried it out, would like to understand the following:
>>
>>1. Is Spark SQL and UDF, able to handle all the workloads?
>>2. What user interface did you provide for data scientist, data
>>engineers and analysts
>>3. What are the challenges in running concurrent queries, by many
>>users, over Spark SQL? Considering Spark still does not provide spill to
>>disk, in many scenarios, are there frequent query failures when executing
>>concurrent queries
>>4. Are there any open source implementations, which provide something
>>similar?
>>
>>
>> Regards,
>> Ashish
>>
>
>


Re: Spark based Data Warehouse

2017-11-13 Thread ashish rawat
Thanks Sky Yin. This really helps.

On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:

We are running Spark in AWS EMR as data warehouse. All data are in S3 and
metadata in Hive metastore.

We have internal tools to creat juypter notebook on the dev cluster. I
guess you can use zeppelin instead, or Livy?

We run genie as a job server for the prod cluster, so users have to submit
their queries through the genie. For better resource utilization, we rely
on Yarn dynamic allocation to balance the load of multiple jobs/queries in
Spark.

Hope this helps.

On Sat, Nov 11, 2017 at 11:21 PM ashish rawat  wrote:

> Hello Everyone,
>
> I was trying to understand if anyone here has tried a data warehouse
> solution using S3 and Spark SQL. Out of multiple possible options
> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
> our aggregates and processing requirements.
>
> If anyone has tried it out, would like to understand the following:
>
>1. Is Spark SQL and UDF, able to handle all the workloads?
>2. What user interface did you provide for data scientist, data
>engineers and analysts
>3. What are the challenges in running concurrent queries, by many
>users, over Spark SQL? Considering Spark still does not provide spill to
>disk, in many scenarios, are there frequent query failures when executing
>concurrent queries
>4. Are there any open source implementations, which provide something
>similar?
>
>
> Regards,
> Ashish
>


Re: Process large JSON file without causing OOM

2017-11-13 Thread Joel D
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping memory
> usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>
>
>
>
>


Process large JSON file without causing OOM

2017-11-13 Thread Alec Swan
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
format. Effectively, my Java service starts up an embedded Spark cluster
(master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data
with dataSet.partitionBy("customer).save(filePath), or capping memory usage
by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source
JSON file into multiple smaller ones and processing the small ones
individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
in it's entirety before converting it to ORC (columnar)? If so, would it
make sense to create a custom receiver that reads the Snappy file and use
Spark streaming for ORC conversion?

Thanks,

Alec


Re: Databricks Serverless

2017-11-13 Thread Mark Hamstra
This is not a Databricks forum.

On Mon, Nov 13, 2017 at 3:18 PM, Benjamin Kim  wrote:

> I have a question about this. The documentation compares the concept
> similar to BigQuery. Does this mean that we will no longer need to deal
> with instances and just pay for execution duration and amount of data
> processed? I’m just curious about how this will be priced.
>
> Also, when will it be ready for production?
>
> Cheers.
>
>


Re: Spark 2.2 Structured Streaming + Kinesis

2017-11-13 Thread Benjamin Kim
To add, we have a CDH 5.12 cluster with Spark 2.2 in our data center.

On Mon, Nov 13, 2017 at 3:15 PM Benjamin Kim  wrote:

> Does anyone know if there is a connector for AWS Kinesis that can be used
> as a source for Structured Streaming?
>
> Thanks.
>
>


Re: Spark 2.2 Structured Streaming + Kinesis

2017-11-13 Thread Jules Damji
You can use the Databricks to connect to Kinesis: 

https://databricks.com/blog/2017/08/09/apache-sparks-structured-streaming-with-amazon-kinesis-on-databricks.html

Cheers 
Jules 

Sent from my iPhone
Pardon the dumb thumb typos :)

> On Nov 13, 2017, at 3:15 PM, Benjamin Kim  wrote:
> 
> Does anyone know if there is a connector for AWS Kinesis that can be used as 
> a source for Structured Streaming?
> 
> Thanks.
> 


Databricks Serverless

2017-11-13 Thread Benjamin Kim
I have a question about this. The documentation compares the concept
similar to BigQuery. Does this mean that we will no longer need to deal
with instances and just pay for execution duration and amount of data
processed? I’m just curious about how this will be priced.

Also, when will it be ready for production?

Cheers.


Spark 2.2 Structured Streaming + Kinesis

2017-11-13 Thread Benjamin Kim
Does anyone know if there is a connector for AWS Kinesis that can be used
as a source for Structured Streaming?

Thanks.


Re: Reload some static data during struct streaming

2017-11-13 Thread spark receiver
I need it cached to improve throughput ,only hope it can be refreshed once a 
day not every batch.


> On Nov 13, 2017, at 4:49 PM, Burak Yavuz  wrote:
> 
> I think if you don't cache the jdbc table, then it should auto-refresh.
> 
> On Mon, Nov 13, 2017 at 1:21 PM, spark receiver  > wrote:
> Hi 
> 
> I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. 
> The thing is I need to join the Kafka message with a relative static table 
> stored in mysql database (let’s call it metadata here).
> 
> So is it possible to reload the metadata table after some time interval(like 
> daily ) without restart running struct streaming?
> 
> Snippet code as following :
> // df_meta contains important information to join with the dataframe read 
> from kafka
> val df_meta = spark.read.format("jdbc").option("url", 
> mysql_url).option("dbtable", "v_entity_ap_rel").load()
> df_meta.cache()
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> “x.x.x.x:9092").option("fetch.message.max.bytes", 
> "5000").option("kafka.max.partition.fetch.bytes", "5000")
>   .option("subscribe", "rawdb.raw_data")
>   .option("failOnDataLoss", true)
>   .option("startingOffsets", "latest")
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(avroDeserialize(_))
>   .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
>   .join(df_meta.as ("b"), $"a.apmac" === $"b.apmac”)
> 
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi 
> then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
> "substring(stime,1,13) STIME_HOUR")
>   .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
>   .option("checkpointLocation", 
> "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
>   .start("T_CF_TABLE")
>   .awaitTermination()
> 
> Mason
> 



Re: Reload some static data during struct streaming

2017-11-13 Thread Burak Yavuz
I think if you don't cache the jdbc table, then it should auto-refresh.

On Mon, Nov 13, 2017 at 1:21 PM, spark receiver 
wrote:

> Hi
>
> I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works
> great. The thing is I need to join the Kafka message with a relative static
> table stored in mysql database (let’s call it metadata here).
>
> So is it possible to reload the metadata table after some time
> interval(like daily ) without restart running struct streaming?
>
> Snippet code as following :
>
> // df_meta contains important information to join with the dataframe read 
> from kafka
>
> val df_meta = spark.read.format("jdbc").option("url", mysql_url).option(
> "dbtable", "v_entity_ap_rel").load()
>
> df_meta.cache()
>
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> *“*x.x.x.x:9092").option("fetch.message.max.bytes", 
> "5000").option("kafka.max.partition.fetch.bytes", "5000")
>   .option("subscribe", "rawdb.raw_data")
>   .option("failOnDataLoss", true)
>   .option("startingOffsets", "latest")
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(avroDeserialize(_))
>   .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
>   .join(df_meta.as("b"), $"a.apmac" === $"b.apmac*”*)
>
>
> df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi 
> then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
> "substring(stime,1,13) STIME_HOUR")
>   .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
>   .option("checkpointLocation", 
> "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
>   .start("T_CF_TABLE")
>   .awaitTermination()
>
>
> Mason
>


Reload some static data during struct streaming

2017-11-13 Thread spark receiver
Hi 

I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. 
The thing is I need to join the Kafka message with a relative static table 
stored in mysql database (let’s call it metadata here).

So is it possible to reload the metadata table after some time interval(like 
daily ) without restart running struct streaming?

Snippet code as following :
// df_meta contains important information to join with the dataframe read from 
kafka
val df_meta = spark.read.format("jdbc").option("url", 
mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", 
“x.x.x.x:9092").option("fetch.message.max.bytes", 
"5000").option("kafka.max.partition.fetch.bytes", "5000")
  .option("subscribe", "rawdb.raw_data")
  .option("failOnDataLoss", true)
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .as[ApRawData].select("APMAC", "RSSI", "sourceMacAddress", "updatingTime")
  .join(df_meta.as("b"), $"a.apmac" === $"b.apmac”)

df.selectExpr("ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then 
'1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", 
"substring(stime,1,13) STIME_HOUR")
  .distinct().writeStream.format("parquet").partitionBy("STIME_HOUR")
  .option("checkpointLocation", 
"/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
  .start("T_CF_TABLE")
  .awaitTermination()

Mason

Re: Spark based Data Warehouse

2017-11-13 Thread Sky Yin
We are running Spark in AWS EMR as data warehouse. All data are in S3 and
metadata in Hive metastore.

We have internal tools to creat juypter notebook on the dev cluster. I
guess you can use zeppelin instead, or Livy?

We run genie as a job server for the prod cluster, so users have to submit
their queries through the genie. For better resource utilization, we rely
on Yarn dynamic allocation to balance the load of multiple jobs/queries in
Spark.

Hope this helps.

On Sat, Nov 11, 2017 at 11:21 PM ashish rawat  wrote:

> Hello Everyone,
>
> I was trying to understand if anyone here has tried a data warehouse
> solution using S3 and Spark SQL. Out of multiple possible options
> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
> our aggregates and processing requirements.
>
> If anyone has tried it out, would like to understand the following:
>
>1. Is Spark SQL and UDF, able to handle all the workloads?
>2. What user interface did you provide for data scientist, data
>engineers and analysts
>3. What are the challenges in running concurrent queries, by many
>users, over Spark SQL? Considering Spark still does not provide spill to
>disk, in many scenarios, are there frequent query failures when executing
>concurrent queries
>4. Are there any open source implementations, which provide something
>similar?
>
>
> Regards,
> Ashish
>


Re: Spark based Data Warehouse

2017-11-13 Thread Deepak Sharma
If you have only 1 user , its still possible to execute non-blocking long
running queries .
Best way is to have different users with pre assigned resources , run their
queries .

HTH

Thanks
Deepak

On Nov 13, 2017 23:56, "ashish rawat"  wrote:

> Thanks Everyone. I am still not clear on what is the right way to execute
> support multiple users, running concurrent queries with Spark. Is it
> through multiple spark contexts or through Livy (which creates a single
> spark context only).
>
> Also, what kind of isolation is possible with Spark SQL? If one user fires
> a big query, then would that choke all other queries in the cluster?
>
> Regards,
> Ashish
>
> On Mon, Nov 13, 2017 at 3:10 AM, Patrick Alwell 
> wrote:
>
>> Alcon,
>>
>>
>>
>> You can most certainly do this. I’ve done benchmarking with Spark SQL and
>> the TPCDS queries using S3 as the filesystem.
>>
>>
>>
>> Zeppelin and Livy server work well for the dash boarding and concurrent
>> query issues:  https://hortonworks.com/blog/
>> livy-a-rest-interface-for-apache-spark/
>>
>>
>>
>> Livy Server will allow you to create multiple spark contexts via REST:
>> https://livy.incubator.apache.org/
>>
>>
>>
>> If you are looking for broad SQL functionality I’d recommend
>> instantiating a Hive context. And Spark is able to spill to disk à
>> https://spark.apache.org/faq.html
>>
>>
>>
>> There are multiple companies running spark within their data warehouse
>> solutions: https://ibmdatawarehousing.wordpress.com/2016/10/12/steinbac
>> h_dashdb_local_spark/
>>
>>
>>
>> Edmunds used Spark to allow business analysts to point Spark to files in
>> S3 and infer schema: https://www.youtube.com/watch?v=gsR1ljgZLq0
>>
>>
>>
>> Recommend running some benchmarks and testing query scenarios for your
>> end users; but it sounds like you’ll be using it for exploratory analysis.
>> Spark is great for this ☺
>>
>>
>>
>> -Pat
>>
>>
>>
>>
>>
>> *From: *Vadim Semenov 
>> *Date: *Sunday, November 12, 2017 at 1:06 PM
>> *To: *Gourav Sengupta 
>> *Cc: *Phillip Henry , ashish rawat <
>> dceash...@gmail.com>, Jörn Franke , Deepak Sharma <
>> deepakmc...@gmail.com>, spark users 
>> *Subject: *Re: Spark based Data Warehouse
>>
>>
>>
>> It's actually quite simple to answer
>>
>>
>>
>> > 1. Is Spark SQL and UDF, able to handle all the workloads?
>>
>> Yes
>>
>>
>>
>> > 2. What user interface did you provide for data scientist, data
>> engineers and analysts
>>
>> Home-grown platform, EMR, Zeppelin
>>
>>
>>
>> > What are the challenges in running concurrent queries, by many users,
>> over Spark SQL? Considering Spark still does not provide spill to disk, in
>> many scenarios, are there frequent query failures when executing concurrent
>> queries
>>
>> You can run separate Spark Contexts, so jobs will be isolated
>>
>>
>>
>> > Are there any open source implementations, which provide something
>> similar?
>>
>> Yes, many.
>>
>>
>>
>>
>>
>> On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>> Dear Ashish,
>>
>> what you are asking for involves at least a few weeks of dedicated
>> understanding of your used case and then it takes at least 3 to 4 months to
>> even propose a solution. You can even build a fantastic data warehouse just
>> using C++. The matter depends on lots of conditions. I just think that your
>> approach and question needs a lot of modification.
>>
>>
>>
>> Regards,
>>
>> Gourav
>>
>>
>>
>> On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
>> wrote:
>>
>> Hi, Ashish.
>>
>> You are correct in saying that not *all* functionality of Spark is
>> spill-to-disk but I am not sure how this pertains to a "concurrent user
>> scenario". Each executor will run in its own JVM and is therefore isolated
>> from others. That is, if the JVM of one user dies, this should not effect
>> another user who is running their own jobs in their own JVMs. The amount of
>> resources used by a user can be controlled by the resource manager.
>>
>> AFAIK, you configure something like YARN to limit the number of cores and
>> the amount of memory in the cluster a certain user or group is allowed to
>> use for their job. This is obviously quite a coarse-grained approach as (to
>> my knowledge) IO is not throttled. I believe people generally use something
>> like Apache Ambari to keep an eye on network and disk usage to mitigate
>> problems in a shared cluster.
>>
>> If the user has badly designed their query, it may very well fail with
>> OOMEs but this can happen irrespective of whether one user or many is using
>> the cluster at a given moment in time.
>>
>>
>>
>> Does this help?
>>
>> Regards,
>>
>> Phillip
>>
>>
>>
>> On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat 
>> wrote:
>>
>> Thanks Jorn and Phillip. My question was specifically to anyone who 

Re: Spark based Data Warehouse

2017-11-13 Thread ashish rawat
Thanks Everyone. I am still not clear on what is the right way to execute
support multiple users, running concurrent queries with Spark. Is it
through multiple spark contexts or through Livy (which creates a single
spark context only).

Also, what kind of isolation is possible with Spark SQL? If one user fires
a big query, then would that choke all other queries in the cluster?

Regards,
Ashish

On Mon, Nov 13, 2017 at 3:10 AM, Patrick Alwell 
wrote:

> Alcon,
>
>
>
> You can most certainly do this. I’ve done benchmarking with Spark SQL and
> the TPCDS queries using S3 as the filesystem.
>
>
>
> Zeppelin and Livy server work well for the dash boarding and concurrent
> query issues:  https://hortonworks.com/blog/livy-a-rest-interface-for-
> apache-spark/
>
>
>
> Livy Server will allow you to create multiple spark contexts via REST:
> https://livy.incubator.apache.org/
>
>
>
> If you are looking for broad SQL functionality I’d recommend instantiating
> a Hive context. And Spark is able to spill to disk à
> https://spark.apache.org/faq.html
>
>
>
> There are multiple companies running spark within their data warehouse
> solutions: https://ibmdatawarehousing.wordpress.com/2016/10/12/
> steinbach_dashdb_local_spark/
>
>
>
> Edmunds used Spark to allow business analysts to point Spark to files in
> S3 and infer schema: https://www.youtube.com/watch?v=gsR1ljgZLq0
>
>
>
> Recommend running some benchmarks and testing query scenarios for your end
> users; but it sounds like you’ll be using it for exploratory analysis.
> Spark is great for this ☺
>
>
>
> -Pat
>
>
>
>
>
> *From: *Vadim Semenov 
> *Date: *Sunday, November 12, 2017 at 1:06 PM
> *To: *Gourav Sengupta 
> *Cc: *Phillip Henry , ashish rawat <
> dceash...@gmail.com>, Jörn Franke , Deepak Sharma <
> deepakmc...@gmail.com>, spark users 
> *Subject: *Re: Spark based Data Warehouse
>
>
>
> It's actually quite simple to answer
>
>
>
> > 1. Is Spark SQL and UDF, able to handle all the workloads?
>
> Yes
>
>
>
> > 2. What user interface did you provide for data scientist, data
> engineers and analysts
>
> Home-grown platform, EMR, Zeppelin
>
>
>
> > What are the challenges in running concurrent queries, by many users,
> over Spark SQL? Considering Spark still does not provide spill to disk, in
> many scenarios, are there frequent query failures when executing concurrent
> queries
>
> You can run separate Spark Contexts, so jobs will be isolated
>
>
>
> > Are there any open source implementations, which provide something
> similar?
>
> Yes, many.
>
>
>
>
>
> On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Dear Ashish,
>
> what you are asking for involves at least a few weeks of dedicated
> understanding of your used case and then it takes at least 3 to 4 months to
> even propose a solution. You can even build a fantastic data warehouse just
> using C++. The matter depends on lots of conditions. I just think that your
> approach and question needs a lot of modification.
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
> wrote:
>
> Hi, Ashish.
>
> You are correct in saying that not *all* functionality of Spark is
> spill-to-disk but I am not sure how this pertains to a "concurrent user
> scenario". Each executor will run in its own JVM and is therefore isolated
> from others. That is, if the JVM of one user dies, this should not effect
> another user who is running their own jobs in their own JVMs. The amount of
> resources used by a user can be controlled by the resource manager.
>
> AFAIK, you configure something like YARN to limit the number of cores and
> the amount of memory in the cluster a certain user or group is allowed to
> use for their job. This is obviously quite a coarse-grained approach as (to
> my knowledge) IO is not throttled. I believe people generally use something
> like Apache Ambari to keep an eye on network and disk usage to mitigate
> problems in a shared cluster.
>
> If the user has badly designed their query, it may very well fail with
> OOMEs but this can happen irrespective of whether one user or many is using
> the cluster at a given moment in time.
>
>
>
> Does this help?
>
> Regards,
>
> Phillip
>
>
>
> On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat  wrote:
>
> Thanks Jorn and Phillip. My question was specifically to anyone who have
> tried creating a system using spark SQL, as Data Warehouse. I was trying to
> check, if someone has tried it and they can help with the kind of workloads
> which worked and the ones, which have problems.
>
>
>
> Regarding spill to disk, I might be wrong but not all functionality of
> spark is spill to disk. So it still doesn't provide DB like reliability in
> execution. In case of DBs, queries get slow but they don't fail or go out
> 

Use of Accumulators

2017-11-13 Thread Kedarnath Dixit
Hi,


We need some way to toggle the flag of  a variable in transformation.


We are thinking to make use of spark  Accumulators for this purpose.


Can we use these as below:


Variables  -> Initial Value

 Variable1 -> 0

 Variable2 -> 0


In one of the transformations if we need to make Variable2's value to 1. Can we 
achieve this using Accumulators? Please confirm.


Thanks!


With Regards,

~Kedar Dixit


kedarnath_di...@persistent.com | @kedarsdixit | M +91 90499 15588 | T +91 (20) 
6703 4783

Persistent Systems | Partners In Innovation | www.persistent.com
DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.


Re: Spark SQL - Truncate Day / Hour

2017-11-13 Thread Eike von Seggern
Hi,

you can truncate datetimes like this (in pyspark), e.g. to 5 minutes:

import pyspark.sql.functions as F
df.select((F.floor(F.col('myDateColumn').cast('long') / 300) *
300).cast('timestamp'))

Best,
Eike

David Hodefi  schrieb am Mo., 13. Nov. 2017 um
12:27 Uhr:

> I am familiar with those functions, none of them is actually truncating a
> date. We can use those methods to help implement truncate method. I think
> truncating a day/ hour should be as simple as "truncate(...,"DD")  or
> truncate(...,"HH")  ".
>
> On Thu, Nov 9, 2017 at 8:23 PM, Gaspar Muñoz  wrote:
>
>> There are functions for day (called dayOfMonth and dayOfYear) and hour
>> (called hour). You can view them here:
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions
>>
>> Example:
>>
>> import org.apache.spark.sql.functions._
>> val df = df.select(hour($"myDateColumn"), dayOfMonth($"myDateColumn"),
>> dayOfYear($"myDateColumn"))
>>
>> 2017-11-09 12:05 GMT+01:00 David Hodefi :
>>
>>> I would like to truncate date to his day or hour. currently it is only
>>> possible to truncate MONTH or YEAR.
>>> 1.How can achieve that?
>>> 2.Is there any pull request about this issue?
>>> 3.If there is not any open pull request about this issue, what are the
>>> implications that I should be aware of when coding /contributing it as a
>>> pull request?
>>>
>>> Last question is,  Looking at DateTImeUtils class code, it seems like
>>> implementation is not using any open library for handling dates i.e
>>> apache-common , Why implementing it instead of reusing open source?
>>>
>>> Thanks David
>>>
>>
>>
>>
>> --
>> Gaspar Muñoz Soria
>>
>> Vía de las dos Castillas, 33
>> ,
>> Ática 4, 3ª Planta
>> 28224 Pozuelo de Alarcón, Madrid
>> Tel: +34 91 828 6473
>>
>
>


Re: Spark SQL - Truncate Day / Hour

2017-11-13 Thread David Hodefi
I am familiar with those functions, none of them is actually truncating a
date. We can use those methods to help implement truncate method. I think
truncating a day/ hour should be as simple as "truncate(...,"DD")  or
truncate(...,"HH")  ".

On Thu, Nov 9, 2017 at 8:23 PM, Gaspar Muñoz  wrote:

> There are functions for day (called dayOfMonth and dayOfYear) and hour
> (called hour). You can view them here: https://spark.apache.
> org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions
>
> Example:
>
> import org.apache.spark.sql.functions._
> val df = df.select(hour($"myDateColumn"), dayOfMonth($"myDateColumn"),
> dayOfYear($"myDateColumn"))
>
> 2017-11-09 12:05 GMT+01:00 David Hodefi :
>
>> I would like to truncate date to his day or hour. currently it is only
>> possible to truncate MONTH or YEAR.
>> 1.How can achieve that?
>> 2.Is there any pull request about this issue?
>> 3.If there is not any open pull request about this issue, what are the
>> implications that I should be aware of when coding /contributing it as a
>> pull request?
>>
>> Last question is,  Looking at DateTImeUtils class code, it seems like
>> implementation is not using any open library for handling dates i.e
>> apache-common , Why implementing it instead of reusing open source?
>>
>> Thanks David
>>
>
>
>
> --
> Gaspar Muñoz Soria
>
> Vía de las dos Castillas, 33
> ,
> Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473
>