Re: statefulStreaming checkpointing too often

2017-06-02 Thread Tathagata Das
There are two kinds of checkpointing going on here - metadata and data. The
100 second that you have configured is the data checkpointing (expensive,
large data) where the RDD data is being written to HDFS. The 10 second one
is the metadata checkpoint (cheap, small data) where the metadata of the
query (kafka offsets, etc.) are being saved before and after every 10
second batch. Hope this clarifies.

On Thu, Jun 1, 2017 at 2:54 PM, David Rosenstrauch 
wrote:

> I'm running into a weird issue with a stateful streaming job I'm running.
> (Spark 2.1.0 reading from kafka 0-10 input stream.)
>
> From what I understand from the docs, by default the checkpoint interval
> for stateful streaming is 10 * batchInterval.  Since I'm running a batch
> interval of 10 seconds, I would expect that checkpointing should only get
> done every 100 seconds.  But what I'm seeing is that Spark is not only
> checkpointing every 10 seconds, it's checkpointing twice every 10 seconds!
>
> My code approximately looks like follows:
>
> val eventStream = kafkaStream.
> transform(
> ...
> ).
> map(
> ...
> ).
> transform(
> ...
> )
>
> val stateStream = eventStream.mapWithState(
> ...
> )
>
> stateUpdatesStream.foreachRDD(
> ...
> )
>
>
> When the app initializes, the checkpointing configuration looks like so:
>
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Duration for remembering
> RDDs set to 20 ms for org.apache.spark.streaming.kafka010.
> DirectKafkaInputDStream@4a85a52c
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Storage level = Serialized
> 1x Replicated
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Remember interval = 20
> ms
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Initialized and validated
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
> 17/06/01 21:19:05 INFO TransformedDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 20 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.TransformedDStream@201d4bfb
> 17/06/01 21:19:05 INFO MappedDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO MappedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO MappedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO MappedDStream: Remember interval = 20 ms
> 17/06/01 21:19:05 INFO MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1208bde7
> 17/06/01 21:19:05 INFO TransformedDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 20 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.TransformedDStream@370b0505
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Storage level = Memory
> Deserialized 1x Replicated
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
> 10 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Remember interval =
> 20 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Initialized and
> validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@
> 746c7658
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Slide time = 1 ms
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Storage level = Serialized
> 1x Replicated
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Remember interval = 1
> ms
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Initialized and validated
> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@75d7326b
> 17/06/01 21:19:05 INFO ForEachDStream: Slide time = 1 ms
> 17/06/01 21:19:05 INFO ForEachDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO ForEachDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO ForEachDStream: Remember interval = 1 ms
> 17/06/01 21:19:05 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@2b3b2628
>
>
> Note that there's one line that's correctly showing the 100 second
> checkpointing interval:
>
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
> 10 ms
>
>
> And yet the app is still performing checkpointing every 10 seconds ...
> twice every 10 seconds, in fact!
>
>

Re: Spark 2.1 - Infering schema of dataframe after reading json files not during

2017-06-02 Thread vaquar khan
You can add filter or replace null with value like 0 or string.

df.na.fill(0, Seq("y"))

Regards,
Vaquar khan

On Jun 2, 2017 11:25 AM, "Alonso Isidoro Roman"  wrote:

not sure if this can help you, but you can infer programmatically the
schema providing a json schema file,

val path: Path = new Path(schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)

val inputStream: FSDataInputStream = fileSystem.open(path)

val schema_json = Stream.cons(inputStream.readLine(),
Stream.continually(inputStream.readLine))

logger.debug("schema_json looks like " + schema_json.head)

val mySchemaStructType =
DataType.fromJson(schema_json.head).asInstanceOf[StructType]

logger.debug("mySchemaStructType is " + mySchemaStructType)


where schema_parquet can be something like this:

{"type" : "struct","fields" : [ {"name" : "column0","type" :
"string","nullable" : false},{"name":"column1", "type":"string",
"nullable":false},{"name":"column2", "type":"string",
"nullable":true}, {"name":"column3", "type":"string",
"nullable":false}]}



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-06-02 16:11 GMT+02:00 Aseem Bansal :

> When we read files in spark it infers the schema. We have the option to
> not infer the schema. Is there a way to ask spark to infer the schema again
> just like when reading json?
>
> The reason we want to get this done is because we have a problem in our
> data files. We have a json file containing this
>
> {"a": NESTED_JSON_VALUE}
> {"a":"null"}
>
> It should have been empty json but due to a bug it became "null" instead.
> Now, when we read the file the column "a" is considered as a String.
> Instead what we want to do is ask spark to read the file considering "a" as
> a String, filter the "null" out/replace with empty json and then ask spark
> to infer schema of "a" after the fix so we can access the nested json
> properly.
>


Re: An Architecture question on the use of virtualised clusters

2017-06-02 Thread Gene Pang
As Vincent mentioned earlier, I think Alluxio can work for this. You can mount
your (potentially remote) storage systems to Alluxio
,
and deploy Alluxio co-located to the compute cluster. The computation
framework will still achieve data locality since Alluxio workers are
co-located, even though the existing storage systems may be remote. You can
also use tiered storage
 to
deploy using only memory, and/or other physical media.

Here are some blogs (Alluxio with Minio
,
Alluxio with HDFS
,
Alluxio with S3
)
which use similar architecture.

Hope that helps,
Gene

On Thu, Jun 1, 2017 at 1:45 AM, Mich Talebzadeh 
wrote:

> As a matter of interest what is the best way of creating virtualised
> clusters all pointing to the same physical data?
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 June 2017 at 09:27, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> If mandatory, you can use a local cache like alluxio
>>
>> Le 1 juin 2017 10:23 AM, "Mich Talebzadeh"  a
>> écrit :
>>
>>> Thanks Vincent. I assume by physical data locality you mean you are
>>> going through Isilon and HCFS and not through direct HDFS.
>>>
>>> Also I agree with you that shared network could be an issue as well.
>>> However, it allows you to reduce data redundancy (you do not need R3 in
>>> HDFS anymore) and also you can build virtual clusters on the same data. One
>>> cluster for read/writes and another for Reads? That is what has been
>>> suggestes!.
>>>
>>> regards
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 1 June 2017 at 08:55, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
 I don't recommend this kind of design because you loose physical data
 locality and you will be affected by "bad neighboors" that are also using
 the network storage... We have one similar design but restricted to small
 clusters (more for experiments than production)

 2017-06-01 9:47 GMT+02:00 Mich Talebzadeh :

> Thanks Jorn,
>
> This was a proposal made by someone as the firm is already using this
> tool on other SAN based storage and extend it to Big Data
>
> On paper it seems like a good idea, in practice it may be a Wandisco
> scenario again..  Of course as ever one needs to EMC for reference calls
> ans whether anyone is using this product in anger.
>
>
>
> At the end of the day it's not HDFS.  It is OneFS with a HCFS API.
>  However that may suit our needs.  But  would need to PoC it and test it
> thoroughly!
>
>
> Cheers
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 1 June 2017 at 08:21, Jörn Franke  wrote:
>
>> Hi,
>>
>> I have done this (not Isilon, but another

Re: Spark 2.1 - Infering schema of dataframe after reading json files not during

2017-06-02 Thread Alonso Isidoro Roman
not sure if this can help you, but you can infer programmatically the
schema providing a json schema file,

val path: Path = new Path(schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)

val inputStream: FSDataInputStream = fileSystem.open(path)

val schema_json = Stream.cons(inputStream.readLine(),
Stream.continually(inputStream.readLine))

logger.debug("schema_json looks like " + schema_json.head)

val mySchemaStructType =
DataType.fromJson(schema_json.head).asInstanceOf[StructType]

logger.debug("mySchemaStructType is " + mySchemaStructType)


where schema_parquet can be something like this:

{"type" : "struct","fields" : [ {"name" : "column0","type" :
"string","nullable" : false},{"name":"column1", "type":"string",
"nullable":false},{"name":"column2", "type":"string",
"nullable":true}, {"name":"column3", "type":"string",
"nullable":false}]}



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-06-02 16:11 GMT+02:00 Aseem Bansal :

> When we read files in spark it infers the schema. We have the option to
> not infer the schema. Is there a way to ask spark to infer the schema again
> just like when reading json?
>
> The reason we want to get this done is because we have a problem in our
> data files. We have a json file containing this
>
> {"a": NESTED_JSON_VALUE}
> {"a":"null"}
>
> It should have been empty json but due to a bug it became "null" instead.
> Now, when we read the file the column "a" is considered as a String.
> Instead what we want to do is ask spark to read the file considering "a" as
> a String, filter the "null" out/replace with empty json and then ask spark
> to infer schema of "a" after the fix so we can access the nested json
> properly.
>


Spark SQL, formatting timezone in UTC

2017-06-02 Thread yohann jardin
Hello everyone,


I'm having a hard time with time zones.

I have a Long representing a timestamp: 149636160, I want the output to be 
2017-06-02 00:00:00


Based on 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html

The only function that helps formatting a timestamp is from_unixtime, but it 
bases the output timezone as the system timezone... but my timezone and the 
server timezone I'm working on are not in UTC.

I couldn't find any help on Google among the dozens of JIRA, stackoverflow and 
blog articles I found.


In the end I decided to write a udf:

def formatMs(ms: Long): java.lang.String = {
val formatter = new java.text.SimpleDateFormat("MMdd HH:mm:ss")
formatter.setTimeZone(java.util.TimeZone.getTimeZone("GMT"));
formatter.format(new java.util.Date(ms))
}

spark.udf.register("formatMs", formatMs)
spark.sql("SELECT formatMs(149636160)").show


But if I really go for that, it will decrease the performance of my 
application, right?

Like I need to aggregate some data based on such column. As my function is a 
black box, spark will use it first and aggregate on the String output of the 
udf, though it would lead to the same result by aggregate on the initial Long 
value and then using the udf.


I know I can also forget my udf in the sql query, and apply, on the created 
dataframe, the functions withColumn() and withColumnRenamed(), but that is 
something to bench.


Did I miss any possibility to do that within a SparkQL query using standard 
functions or something much more performant than what I can think of?


Regards,

Yohann


Re: Number Of Partitions in RDD

2017-06-02 Thread neil90
CLuster mode with HDFS? or local mode?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-Of-Partitions-in-RDD-tp28730p28737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.1 - Infering schema of dataframe after reading json files not during

2017-06-02 Thread Aseem Bansal
When we read files in spark it infers the schema. We have the option to not
infer the schema. Is there a way to ask spark to infer the schema again
just like when reading json?

The reason we want to get this done is because we have a problem in our
data files. We have a json file containing this

{"a": NESTED_JSON_VALUE}
{"a":"null"}

It should have been empty json but due to a bug it became "null" instead.
Now, when we read the file the column "a" is considered as a String.
Instead what we want to do is ask spark to read the file considering "a" as
a String, filter the "null" out/replace with empty json and then ask spark
to infer schema of "a" after the fix so we can access the nested json
properly.