Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-27 Thread Gourav Sengupta
Hi,


As per documentation in:
https://spark.apache.org/docs/latest/configuration.html


spark.local.dir /tmp Directory to use for "scratch" space in Spark,
including map output files and RDDs that get stored on disk. This should be
on a fast, local disk in your system. It can also be a comma-separated list
of multiple directories on different disks. NOTE: In Spark 1.0 and later
this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

Regards,
Gourav Sengupta





On Mon, Mar 26, 2018 at 8:28 PM, Michael Shtelma  wrote:

> Hi Keith,
>
> Thanks  for the suggestion!
> I have solved this already.
> The problem was, that the yarn process was not responding to
> start/stop commands and has not applied my configuration changes.
> I have killed it and restarted my cluster, and after that yarn has
> started using yarn.nodemanager.local-dirs parameter defined in
> yarn-site.xml.
> After this change, -Djava.io.tmpdir for the spark executor was set
> correctly,  according to yarn.nodemanager.local-dirs parameter.
>
> Best,
> Michael
>
>
> On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman 
> wrote:
> > Hi Michael,
> >
> > sorry for the late reply. I guess you may have to set it through the hdfs
> > core-site.xml file. The property you need to set is "hadoop.tmp.dir"
> which
> > defaults to "/tmp/hadoop-${user.name}"
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma 
> wrote:
> >>
> >> Hi Keith,
> >>
> >> Thank you for the idea!
> >> I have tried it, so now the executor command is looking in the following
> >> way :
> >>
> >> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
> >> '-Djava.io.tmpdir=my_prefered_path'
> >>
> >> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/
> msh/appcache/application_1521110306769_0041/container_
> 1521110306769_0041_01_04/tmp
> >>
> >> JVM is using the second Djava.io.tmpdir parameter and writing
> >> everything to the same directory as before.
> >>
> >> Best,
> >> Michael
> >> Sincerely,
> >> Michael Shtelma
> >>
> >>
> >> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman  >
> >> wrote:
> >> > Can you try setting spark.executor.extraJavaOptions to have
> >> > -Djava.io.tmpdir=someValue
> >> >
> >> > Regards,
> >> > Keith.
> >> >
> >> > http://keith-chapman.com
> >> >
> >> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma  >
> >> > wrote:
> >> >>
> >> >> Hi Keith,
> >> >>
> >> >> Thank you for your answer!
> >> >> I have done this, and it is working for spark driver.
> >> >> I would like to make something like this for the executors as well,
> so
> >> >> that the setting will be used on all the nodes, where I have
> executors
> >> >> running.
> >> >>
> >> >> Best,
> >> >> Michael
> >> >>
> >> >>
> >> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
> >> >> 
> >> >> wrote:
> >> >> > Hi Michael,
> >> >> >
> >> >> > You could either set spark.local.dir through spark conf or
> >> >> > java.io.tmpdir
> >> >> > system property.
> >> >> >
> >> >> > Regards,
> >> >> > Keith.
> >> >> >
> >> >> > http://keith-chapman.com
> >> >> >
> >> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <
> mshte...@gmail.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Hi everybody,
> >> >> >>
> >> >> >> I am running spark job on yarn, and my problem is that the
> >> >> >> blockmgr-*
> >> >> >> folders are being created under
> >> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/
> application_id/*
> >> >> >> The size of this folder can grow to a significant size and does
> not
> >> >> >> really fit into /tmp file system for one job, which makes a real
> >> >> >> problem for my installation.
> >> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> >> >> location and expected that the block manager will create the files
> >> >> >> there and not under /tmp, but this is not the case. The files are
> >> >> >> created under /tmp.
> >> >> >>
> >> >> >> I am wondering if there is a way to make spark not use /tmp at all
> >> >> >> and
> >> >> >> configure it to create all the files somewhere else ?
> >> >> >>
> >> >> >> Any assistance would be greatly appreciated!
> >> >> >>
> >> >> >> Best,
> >> >> >> Michael
> >> >> >>
> >> >> >>
> >> >> >> 
> -
> >> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >>
> >> >> >
> >> >
> >> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread Mina Aslani
Hi Naresh,

Thank you for the quick response, appreciate it.
Removing the option("header","true") and trying

df = spark.read.parquet("test.parquet"), now can read the parquet works.
However, I would like to find a way to have the data in csv/readable.
still I cannot save df as csv as it throws.
ava.lang.UnsupportedOperationException: CSV data source does not support
struct data
type.

Any idea?


Best regards,

Mina


On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
wrote:

> In case of storing as parquet file I don’t think it requires header.
> option("header","true")
>
> Give a try by removing header option and then try to read it.  I haven’t
> tried. Just a thought.
>
> Thank you,
> Naresh
>
>
> On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:
>
>> Hi,
>>
>>
>> I am using pyspark. To transform my sample data and create model, I use
>> stringIndexer and OneHotEncoder.
>>
>>
>> However, when I try to write data as csv using below command
>>
>> df.coalesce(1).write.option("header","true").mode("
>> overwrite").csv("output.csv")
>>
>>
>> I get UnsupportedOperationException
>>
>> java.lang.UnsupportedOperationException: CSV data source does not
>> support struct
>> data type.
>>
>> Therefore, to save data and avoid getting the error I use
>>
>>
>> df.coalesce(1).write.option("header","true").mode("
>> overwrite").save("output")
>>
>>
>> The above command saves data but it's in parquet format.
>> How can I read parquet file and convert to csv to observe the data?
>>
>> When I use
>>
>> df = spark.read.parquet("1.parquet"), it throws:
>>
>> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
>> outstanding blocks
>>
>> Your input is appreciated.
>>
>>
>> Best regards,
>>
>> Mina
>>
>>
>>
>> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread naresh Goud
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t
tried. Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:

> Hi,
>
>
> I am using pyspark. To transform my sample data and create model, I use
> stringIndexer and OneHotEncoder.
>
>
> However, when I try to write data as csv using below command
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")
>
>
> I get UnsupportedOperationException
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct data
> type.
>
> Therefore, to save data and avoid getting the error I use
>
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite").save("output")
>
>
> The above command saves data but it's in parquet format.
> How can I read parquet file and convert to csv to observe the data?
>
> When I use
>
> df = spark.read.parquet("1.parquet"), it throws:
>
> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
> outstanding blocks
>
> Your input is appreciated.
>
>
> Best regards,
>
> Mina
>
>
>
> --
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-27 Thread Mina Aslani
Hi,


I am using pyspark. To transform my sample data and create model, I use
stringIndexer and OneHotEncoder.


However, when I try to write data as csv using below command

df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")


I get UnsupportedOperationException

java.lang.UnsupportedOperationException: CSV data source does not support
struct data
type.

Therefore, to save data and avoid getting the error I use


df.coalesce(1).write.option("header","true").mode("overwrite").save("output")


The above command saves data but it's in parquet format.
How can I read parquet file and convert to csv to observe the data?

When I use

df = spark.read.parquet("1.parquet"), it throws:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
outstanding blocks

Your input is appreciated.


Best regards,

Mina


Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Dongjoon Hyun
You may hit SPARK-23355 (convertMetastore should not ignore table properties).

Since it's a known Spark issue for all Hive tables (Parquet/ORC), could you 
check that too?

Bests,
Dongjoon.

On 2018/03/28 01:00:55, Dongjoon Hyun  wrote: 
> Hi, Eric.
> 
> For me, Spark 2.3 works correctly like the following. Could you give us some 
> reproducible example?
> 
> ```
> scala> sql("set spark.sql.orc.impl=native")
> 
> scala> sql("set spark.sql.orc.compression.codec=zlib")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> 
> scala> spark.range(10).write.orc("/tmp/zlib_test")
> 
> scala> spark.read.orc("/tmp/zlib_test").show
> +---+
> | id|
> +---+
> |  8|
> |  9|
> |  5|
> |  0|
> |  3|
> |  4|
> |  6|
> |  7|
> |  1|
> |  2|
> +---+
> 
> scala> sc.version
> res4: String = 2.3.0
> ```
> 
> Bests,
> Dongjoon.
> 
> 
> On 2018/03/23 15:03:29, Eirik Thorsnes  wrote: 
> > Hi all,
> > 
> > I'm trying the new ORC native in Spark 2.3
> > (org.apache.spark.sql.execution.datasources.orc).
> > 
> > I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> > I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
> > 
> > *NOTE*: the error only occurs with zlib compression, and I see that with
> > Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> > SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
> > 
> > I can write using the new native codepath without errors, but *reading*
> > zlib-compressed ORC, either the newly written ORC-files *or* older
> > ORC-files written with Spark 2.2/1.6 I get the following exception.
> > 
> > === cut =
> > 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> > hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc,
> > range: 0-134217728, partition values: [1999]
> > 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> > hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> > with {include: [true, true, true, true, true, true, true, true, true],
> > offset: 0, length: 134217728}
> > 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> > provided -- using file schema
> > struct
> > 
> > 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> > 1.0 (TID 1)
> > java.nio.BufferUnderflowException
> > at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> > at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> > at
> > org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> > at
> > org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:58)
> > at
> > org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
> > at
> > org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:976)
> > at
> > org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:1815)
> > at
> > org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> > at
> > org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextBatch(OrcColumnarBatchReader.scala:186)
> > at
> > org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextKeyValue(OrcColumnarBatchReader.scala:114)
> > at
> > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> > at
> > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> > at
> > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> > Source)
> > at
> > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> > Source)
> > at
> > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> > at
> > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> > at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
> > at
> > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> > at
> > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> > at
> > 

Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Dongjoon Hyun
Hi, Eric.

For me, Spark 2.3 works correctly like the following. Could you give us some 
reproducible example?

```
scala> sql("set spark.sql.orc.impl=native")

scala> sql("set spark.sql.orc.compression.codec=zlib")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.range(10).write.orc("/tmp/zlib_test")

scala> spark.read.orc("/tmp/zlib_test").show
+---+
| id|
+---+
|  8|
|  9|
|  5|
|  0|
|  3|
|  4|
|  6|
|  7|
|  1|
|  2|
+---+

scala> sc.version
res4: String = 2.3.0
```

Bests,
Dongjoon.


On 2018/03/23 15:03:29, Eirik Thorsnes  wrote: 
> Hi all,
> 
> I'm trying the new ORC native in Spark 2.3
> (org.apache.spark.sql.execution.datasources.orc).
> 
> I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
> 
> *NOTE*: the error only occurs with zlib compression, and I see that with
> Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
> 
> I can write using the new native codepath without errors, but *reading*
> zlib-compressed ORC, either the newly written ORC-files *or* older
> ORC-files written with Spark 2.2/1.6 I get the following exception.
> 
> === cut =
> 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc,
> range: 0-134217728, partition values: [1999]
> 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> with {include: [true, true, true, true, true, true, true, true, true],
> offset: 0, length: 134217728}
> 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> provided -- using file schema
> struct
> 
> 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> 1.0 (TID 1)
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> at
> org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:58)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
> at
> org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.nextVector(TreeReaderFactory.java:976)
> at
> org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:1815)
> at
> org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextBatch(OrcColumnarBatchReader.scala:186)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.nextKeyValue(OrcColumnarBatchReader.scala:114)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> 

Re: ORC native in Spark 2.3, with zlib, gives java.nio.BufferUnderflowException during read

2018-03-27 Thread Xiao Li
Hi, Eirik,

Yes, please open a JIRA.

Thanks,

Xiao

2018-03-23 8:03 GMT-07:00 Eirik Thorsnes :

> Hi all,
>
> I'm trying the new ORC native in Spark 2.3
> (org.apache.spark.sql.execution.datasources.orc).
>
> I've compiled Spark 2.3 from the git branch-2.3 as of March 20th.
> I also get the same error for the Spark 2.2 from Hortonworks HDP 2.6.4.
>
> *NOTE*: the error only occurs with zlib compression, and I see that with
> Snappy I get an extra log-line saying "OrcCodecPool: Got brand-new codec
> SNAPPY". Perhaps zlib codec is never loaded/triggered in the new code?
>
> I can write using the new native codepath without errors, but *reading*
> zlib-compressed ORC, either the newly written ORC-files *or* older
> ORC-files written with Spark 2.2/1.6 I get the following exception.
>
> === cut =
> 2018-03-23 10:36:08,249 INFO FileScanRDD: Reading File path:
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-
> 37dc216b8a99.orc,
> range: 0-134217728, partition values: [1999]
> 2018-03-23 10:36:08,326 INFO ReaderImpl: Reading ORC rows from
> hdfs://.../year=1999/part-r-0-2573bfff-1f18-47d9-b0fb-37dc216b8a99.orc
> with {include: [true, true, true, true, true, true, true, true, true],
> offset: 0, length: 134217728}
> 2018-03-23 10:36:08,326 INFO RecordReaderImpl: Reader schema not
> provided -- using file schema
> struct v10:smallint,lcc:smallint,mcc:smallint,hcc:smallint>
>
> 2018-03-23 10:36:08,824 ERROR Executor: Exception in task 0.0 in stage
> 1.0 (TID 1)
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:500)
> at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
> at
> org.apache.orc.impl.InStream$CompressedStream.read(InStream.java:248)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(
> RunLengthIntegerReaderV2.java:58)
> at
> org.apache.orc.impl.RunLengthIntegerReaderV2.next(
> RunLengthIntegerReaderV2.java:323)
> at
> org.apache.orc.impl.TreeReaderFactory$TimestampTreeReader.
> nextVector(TreeReaderFactory.java:976)
> at
> org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(
> TreeReaderFactory.java:1815)
> at
> org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1184)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.
> nextBatch(OrcColumnarBatchReader.scala:186)
> at
> org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.
> nextKeyValue(OrcColumnarBatchReader.scala:114)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(
> RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:234)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:228)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> === cut =
>
> I have the following set in spark-defaults.conf:
>
> spark.sql.hive.convertMetastoreOrc true
> spark.sql.orc.char.enabled true
> spark.sql.orc.enabled true
> spark.sql.orc.filterPushdown true
> spark.sql.orc.impl native
> spark.sql.orc.enableVectorizedReader true

closure issues: wholeTextFiles

2018-03-27 Thread Gourav Sengupta
Hi,

I can understand facing closure issues while executing this code:



package spark

//this package is about understanding closures as mentioned in:
http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-


import org.apache.spark.sql.SparkSession


object understandClosures extends  App {

  var counter = 0

  //the error thrown is removed in case we use local[*] as master
  val sparkSession = SparkSession
.builder
.master("spark://Gouravs-iMac:7077")
//.master("local[*]")
.appName("test")
.getOrCreate()


  val valueRDD = sparkSession.sparkContext.parallelize(1 until 100)

  println(valueRDD.count())

  valueRDD.foreach(x => counter += x)

  //but even if we use the master as local[*] the total appears as
some random number as -1234435435
  println("the value is " + counter.toString())


  sparkSession.close()

}





Can anyone explain me why am I facing closure issue while executing this
code?

package spark

import org.apache.spark.sql.SparkSession
// Import this utility for working with URLs. Unlike Java the
semicolon ';' is not required.
import java.net.URL
// Use {...} to provide a list of things to import, when you don't
want to import everything
// in a package and you don't want to write a separate line for each type.
import java.io.{File, BufferedInputStream, BufferedOutputStream,
FileOutputStream}




object justenoughTest extends App {

  val sparkSession = SparkSession
.builder
.master("spark://Gouravs-iMac:7077")
//.master("local[*]")
.appName("test")
.getOrCreate()

  println(sparkSession.version)

  println("Spark version:  " + sparkSession.version)
  println("Spark master:   " + sparkSession.sparkContext.master)
  println("Running 'locally'?: " + sparkSession.sparkContext.isLocal)

  val pathSeparator = File.separator

  // The target directory, which we'll now create, if necessary.
  val shakespeare = new
File("/Users/gouravsengupta/Development/data/shakespeare")

  println(sparkSession.version)

  //val fileContents =
sparkSession.read.text("file:///Users/gouravsengupta/Development/data/shakespeare/")
  //val fileContents = sparkSession.read.text(shakespeare.toString)
  val fileContents =
sparkSession.sparkContext.wholeTextFiles(shakespeare.toString)
  println(fileContents.count())

  //I am facing  the closure issues below

  val testThis = fileContents.foreach(x => "printing value" + x._1)


sparkSession.close()

}


Regards,
Gourav Sengupta


unsubscribe

2018-03-27 Thread Andrei Balici
-- 
Andrei Balici
Student at the School of Computer Science,
University of Manchester


PySpark Structured Streaming : Writing to DB in Python and Foreach Sink.

2018-03-27 Thread Ramaswamy, Muthuraman
Hi All,

I am exploring PySpark Structured Streaming and the documentation says the 
Foreach Sink is not supported in Python and is available only with Java/Scala. 
Given the unavailability of this sink, what options are there for the following:


  1.  Will there be support for Foreach Sink in Python in future Spark 
Structured Streaming release?
  2.  What options are there to write streaming query output to Database?
 *   In other words, the streaming query output should be written to a 
database at every trigger interval
 *   I cannot use Memory sink as it is recommended for use only with Debug.

Any suggestions to write to database in PySpark Structured Streaming will help. 
Appreciate your time.

Thank you,

Muthu Ramaswamy


Re: unsubscribe

2018-03-27 Thread Romero, Saul
unsubscribe

On Tue, Mar 27, 2018 at 1:15 PM, Nicholas Sharkey  wrote:

>
>


unsubscribe

2018-03-27 Thread Nicholas Sharkey



Spark on K8s resource staging server timeout

2018-03-27 Thread Jenna Hoole
So I'm running into an issue with my resource staging server that's
producing a stacktrace like Issue 342
, but I don't
think for the same reasons. What's happening is that every time after I
start up a resource staging server, the first job submitted that uses it
will fail with a java.net.SocketTimeoutException: timeout, and then every
subsequent job will run perfectly. Including with different jars and
different users. It's only ever the first job that fails and it always
fails. I know I'm also running into Issue 577
 in that it takes
about three minutes before the resource staging server is accessible, but
I'm still failing waiting over ten minutes or in one case overnight. And
I'm just using the examples jar, so it's not a super large jar like in
Issue 342.

This isn't great for our CI process, so has anyone seen anything like this
before or know how to increase the timeout if it just takes a while on
initial contact? Using spark.network.timeout has no effect.

[jhoole@nid6 spark]$ kubectl get pods | grep jhoole-spark

jhoole-spark-resource-staging-server-6475c8-w5cdm   1/1   Running
0  13m

[jhoole@nid6 spark]$ kubectl get svc | grep jhoole-spark

jhoole-spark-resource-staging-service   NodePort10.96.143.55
  1:30622/TCP 13m

[jhoole@nid6 spark]$ bin/spark-submit --class
org.apache.spark.examples.SparkPi --conf spark.app.name=spark-pi --conf
spark.kubernetes.resourceStagingServer.uri=http://192.168.0.1:30622
./examples/target/scala-2.11/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar

2018-03-27 12:30:13 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

2018-03-27 12:30:13 INFO  UserGroupInformation:966 - Login successful for
user jhoole@local using keytab file /security/secrets/jhoole.keytab

2018-03-27 12:30:14 INFO  HadoopStepsOrchestrator:54 - Hadoop Conf
directory: /etc/hadoop/conf

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls to: jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls to:
jhoole

2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing view acls groups to:


2018-03-27 12:30:14 INFO  SecurityManager:54 - Changing modify acls groups
to:

2018-03-27 12:30:14 INFO  SecurityManager:54 - SecurityManager:
authentication disabled; ui acls disabled; users  with view permissions:
Set(jhoole); groups with view permissions: Set(); users  with modify
permissions: Set(jhoole); groups with modify permissions: Set()

Exception in thread "main" java.net.SocketTimeoutException: timeout

at okio.Okio$4.newTimeoutException(Okio.java:230)

at okio.AsyncTimeout.exit(AsyncTimeout.java:285)

at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)

at okio.RealBufferedSource.indexOf(RealBufferedSource.java:345)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:217)

at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)

at
okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)

at
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)

at okhttp3.RealCall.execute(RealCall.java:69)

at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.getTypedResponseResult(SubmittedDependencyUploaderImpl.scala:101)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.doUpload(SubmittedDependencyUploaderImpl.scala:97)

at
org.apache.spark.deploy.k8s.submit.SubmittedDependencyUploaderImpl.uploadJars(SubmittedDependencyUploaderImpl.scala:70)

at

Re: Class cast exception while using Data Frames

2018-03-27 Thread Nikhil Goyal
You can run this on spark shell

*CODE:*

case class InstanceData(service: String, metric: String, zone:
String, source: String, time: Long, value: Double )

val seq = sc.parallelize(Seq(
  InstanceData("serviceA", "metricA", "zoneA", "sourceA", 1000L,
1.0),
  InstanceData("serviceA", "metricA", "zoneA", "hostA", 1000L, 1.0),
  InstanceData("serviceD", "metricA", "zoneB", "hostB", 1000L, 2.0),
  InstanceData("serviceA", "metricF", "zoneA", "hostB", 1000L, 1.0)
))

val instData =  spark.createDataFrame(seq)

def makeMap = udf((service: String, metric: String, value: Double)
=> Map((service, metric) -> value))

val instDF = instData.withColumn("metricMap", makeMap($"service",
$"metric", $"value"))

def avgMapValueUDF = udf((newMap: Map[(String, String), Double],
count: Long) => {
  newMap.keys
.map { keyTuple =>
  val sum = newMap.getOrElse(keyTuple, 0.0)
  (keyTuple, sum / count.toDouble)
}.toMap
})

instDF.withColumn("customMap", avgMapValueUDF(col("metricMap"),
lit(1))).show



On Mon, Mar 26, 2018 at 11:51 PM, Shmuel Blitz 
wrote:

> Hi Nikhil,
>
> Can you please put a code snippet that reproduces the issue?
>
> Shmuel
>
> On Tue, Mar 27, 2018 at 12:55 AM, Nikhil Goyal 
> wrote:
>
>>  |-- myMap: map (nullable = true)
>>  ||-- key: struct
>>  ||-- value: double (valueContainsNull = true)
>>  |||-- _1: string (nullable = true)
>>  |||-- _2: string (nullable = true)
>>  |-- count: long (nullable = true)
>>
>> On Mon, Mar 26, 2018 at 1:41 PM, Gauthier Feuillen > > wrote:
>>
>>> Can you give the output of “printSchema” ?
>>>
>>>
>>> On 26 Mar 2018, at 22:39, Nikhil Goyal  wrote:
>>>
>>> Hi guys,
>>>
>>> I have a Map[(String, String), Double] as one of my columns. Using
>>>
>>> input.getAs[Map[(String, String), Double]](0)
>>>
>>>  throws exception: Caused by: java.lang.ClassCastException: 
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be 
>>> cast to scala.Tuple2
>>>
>>> Even the schema says that key is of type struct of (string, string).
>>>
>>> Any idea why this is happening?
>>>
>>>
>>> Thanks
>>>
>>> Nikhil
>>>
>>>
>>>
>>
>
>
> --
> Shmuel Blitz
> Big Data Developer
> Email: shmuel.bl...@similarweb.com
> www.similarweb.com
> 
> 
> 
>


[Spark R] Proposal: Exposing RBackend in RRunner

2018-03-27 Thread Jeremy Liu
Spark Users,

In SparkR, RBackend is created in RRunner.main(). This in particular makes
it difficult to control or use the RBackend. For my use case, I am looking
to access the JVMObjectTracker that RBackend maintains for SparkR
dataframes.

Analogously, pyspark starts a py4j.GatewayServer in PythonRunner.main().
It's then possible to start a ClientServer that then has access to the
object bindings between Python/Java.

Is there something similar for SparkR? Or a reasonable way to expose
RBackend?

Thanks!


-- 
-
Jeremy Liu
jeremy.jl@gmail.com


Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-27 Thread Michael Shtelma
Hi,

the Jira Bug is here: https://issues.apache.org/jira/browse/SPARK-23799
I have also created the PR for the issue:
https://github.com/apache/spark/pull/20913
With this fix, it is working for me really well.

Best,
Michael


On Sat, Mar 24, 2018 at 12:39 AM, Takeshi Yamamuro
 wrote:
> Can you file a jira if this is a bug?
> Thanks!
>
> On Sat, Mar 24, 2018 at 1:23 AM, Michael Shtelma  wrote:
>>
>> Hi Maropu,
>>
>> the problem seems to be in FilterEstimation.scala on lines 50 and 52:
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52
>>
>> val filterSelectivity =
>> calculateFilterSelectivity(plan.condition).getOrElse(1.0)
>> val filteredRowCount: BigInt =
>> ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
>>
>> The problem is, that filterSelectivity gets NaN value in my case and
>> NaN cannot be converted to BigDecimal.
>> I can try adding simple if, checking the NaN value and test if this helps.
>> I will also try to understand, why in my case, I am getting NaN.
>>
>> Best,
>> Michael
>>
>>
>> On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro 
>> wrote:
>> > hi,
>> >
>> > What's a query to reproduce this?
>> > It seems when casting double to BigDecimal, it throws the exception.
>> >
>> > // maropu
>> >
>> > On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma 
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I am using Spark 2.3 with activated cost-based optimizer and a couple
>> >> of hive tables, that were analyzed previously.
>> >>
>> >> I am getting the following exception for different queries:
>> >>
>> >> java.lang.NumberFormatException
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:494)
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:824)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>> >>
>> >> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>> >>
>> >> at scala.Option.getOrElse(Option.scala:121)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
>> >>
>> >> at
>> >>
>> >> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)
>> >>
>> >> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> 

java spark udf error

2018-03-27 Thread 崔苗
Hi,
I define a udf to mark the empty string in java like that:

 public class MarkUnknown implements UDF2 {
@Override
public String call(String processor,String fillContent){
if(processor.trim().equals("")){
logger.info("find empty string");
return fillContent;
}
else{
return processor;
}
}
}
and register by sparkSession: 
 spark.udf().register("markUnknown",markUnknown,StringType);

but when I use the udf in sql : "select markUnknown(useId,'unknown') FROM 
table", I got a exception:

Exception in thread "main" java.lang.ClassCastException: 
org.apache.spark.sql.UDFRegistration$$anonfun$27 cannot be cast to 
scala.Function2
at 
org.apache.spark.sql.catalyst.expressions.ScalaUDF.(ScalaUDF.scala:97)
at 
org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
at 
org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:91)
at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1165)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
at scala.util.Try$.apply(Try.scala:192)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:129)
at 
org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:122)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)

I replaced the String "unknown" with other column : "select 
markUnknown(useId,companyId) FROM table" , still got the same exception.
so how to define the udf in java?


thanks for any reply







Re: Calculate co-occurring terms

2018-03-27 Thread Donni Khan
Hi again,

I found example in Scala

 but I don't have any experience with scala?
can anyone convert it to java please?

Thank you,
Donni

On Fri, Mar 23, 2018 at 8:57 AM, Donni Khan 
wrote:

> Hi,
>
> I have a collection of text documents, I extracted the list of significat
> terms from that collection.
> I want to calculate co-occurance matrix for the extracted terms by using
> spark.
>
> I actually stored the the collection of text document in a DataFrame,
>
> StructType schema = *new* StructType(*new* StructField[] {
>
> *new* StructField("ID", DataTypes.*StringType*, *false*,
>
> Metadata.*empty*()),
>
> *new* StructField("text", DataTypes.*StringType*, *false*,
>
> Metadata.*empty*()) });
>
> // Create a DataFrame *wrt* a new schema
>
> DataFrame preProcessedDF = sqlContext.createDataFrame(jrdd, schema);
>
> I can extract the list of terms from "preProcessedDF " into a List or RDD
> or DataFrame.
> for each (term_i,term_j) I want to calculate the realted frequency from
> the original dataset "preProcessedDF "
>
> anyone has scalbale soloution?
>
> thank you,
> Donni
>
>
>
>
>
>
>
>
>


Queries with streaming sources must be executed with writeStream.start();;

2018-03-27 Thread Junfeng Chen
I am reading some data from kafka, and willing to save them to parquet on
hdfs with structured streaming.
The data from kafka is in JSON format. I try to convert them to
DataSet with spark.read.json(). However, I get the exception:
>
> Queries with streaming sources must be executed with
> writeStream.start()

Here is my code:
>
> Dataset df = spark.readStream().format("kafka")...
> Dataset jsonDataset = df.selectExpr("CAST(value AS STRING)").map...
> Dataset rowDataset = spark.read().json(jsonDataset);
>
> rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination();



How to solve it?

Thanks!

Regard,
Junfeng Chen


Re: Class cast exception while using Data Frames

2018-03-27 Thread Shmuel Blitz
Hi Nikhil,

Can you please put a code snippet that reproduces the issue?

Shmuel

On Tue, Mar 27, 2018 at 12:55 AM, Nikhil Goyal  wrote:

>  |-- myMap: map (nullable = true)
>  ||-- key: struct
>  ||-- value: double (valueContainsNull = true)
>  |||-- _1: string (nullable = true)
>  |||-- _2: string (nullable = true)
>  |-- count: long (nullable = true)
>
> On Mon, Mar 26, 2018 at 1:41 PM, Gauthier Feuillen 
> wrote:
>
>> Can you give the output of “printSchema” ?
>>
>>
>> On 26 Mar 2018, at 22:39, Nikhil Goyal  wrote:
>>
>> Hi guys,
>>
>> I have a Map[(String, String), Double] as one of my columns. Using
>>
>> input.getAs[Map[(String, String), Double]](0)
>>
>>  throws exception: Caused by: java.lang.ClassCastException: 
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be 
>> cast to scala.Tuple2
>>
>> Even the schema says that key is of type struct of (string, string).
>>
>> Any idea why this is happening?
>>
>>
>> Thanks
>>
>> Nikhil
>>
>>
>>
>


-- 
Shmuel Blitz
Big Data Developer
Email: shmuel.bl...@similarweb.com
www.similarweb.com

 


unsubscribe

2018-03-27 Thread Mikhail Ibraheem
 


Re: What do I need to set to see the number of records and processing time for each batch in SPARK UI?

2018-03-27 Thread kant kodali
For example in this blog

post.
Looking at figure 1 and figure 2 I wonder What I need to do to see those
graphs in spark 2.3.0?

On Mon, Mar 26, 2018 at 7:10 AM, kant kodali  wrote:

> Hi All,
>
> I am using spark 2.3.0 and I wondering what do I need to set to see the
> number of records and processing time for each batch in SPARK UI? The
> default UI doesn't seem to show this.
>
> Thanks@
>


Re: Open sourcing Sparklens: Qubole's Spark Tuning Tool

2018-03-27 Thread Rohit Karlupia
Let me be more specific:

With GC/CPU aware task scheduling, user doesn't have to worry about
specifying cores carefully. So if the user always specify cores = 100 or
1024 for every executor, he will still not get OOM  (under vast majority of
cases). Internally, the scheduler will vary the number of tasks assigned to
executors ensuring that executor doesn't runs into GC cycles or causes
useless context switches.  In short, as long as users configure cores per
executors on the higher side, it will be harmless in general and can
actually help in increasing the throughput of the system by utilising
unused memory or CPU capacity available for use.

*For example:* lets say we are using 64 GB machine with 8 cores. Lets say
we are using one big 54GB executors with 8 cores. This results in on
average 7GB of memory per task. It is possible that some tasks take more
than 7GB and some takes less than 7GB. Consider a case when one task takes
34GB of memory. Very likely such a stage will fail depending upon if the
rest 7 tasks scheduled at the same time need more than 20GB of memory  (54
- 34). The usual approach to solving this problem without changing the
application would be to sacrifice cores and increase memory per core. The
stable configuration in this case could be 2 cores for 54GB executor, which
will result in wasting of 6 cores "throughout" the application.

With GC/CPU aware task scheduling one can configure the same executors with
say 64 cores and the application is very likely to succeed. Being aware of
GC, the scheduler will stop scheduling tasks on the executor, making it
possible for the running task to consume all 54GB of memory. This ensures
that we only "sacrifice" cores, when necessary and not in general and not
for the whole duration of the application.  On the other hand, if the
scheduler finds out that inspite of running 8 concurrent tasks, we still
have memory and cpu to spare, it will schedule more tasks upto 64, as
configured. So we not only get stability against skew but we also get
higher throughput when possible.

Hope that helps.

thanks,
rohitk












On Tue, Mar 27, 2018 at 9:20 AM, Fawze Abujaber  wrote:

> Thanks for the update.
>
> What about cores per executor?
>
> On Tue, 27 Mar 2018 at 6:45 Rohit Karlupia  wrote:
>
>> Thanks Fawze!
>>
>> On the memory front, I am currently working on GC and CPU aware task
>> scheduling. I see wonderful results based on my tests so far.  Once the
>> feature is complete and available, spark will work with whatever memory is
>> provided (at least enough for the largest possible task). It will also
>> allow you to run say 64 concurrent tasks on 8 core machine, if the nature
>> of tasks doesn't leads to memory or CPU contention. Essentially why worry
>> about tuning memory when you can let spark take care of it automatically
>> based on memory pressure. Will post details when we are ready.  So yes we
>> are working on memory, but it will not be a tool but a transparent feature.
>>
>> thanks,
>> rohitk
>>
>>
>>
>>
>> On Tue, Mar 27, 2018 at 7:53 AM, Fawze Abujaber 
>> wrote:
>>
>>> Hi Rohit,
>>>
>>> I would like to thank you for the unlimited patience and support that
>>> you are providing here and behind the scene for all of us.
>>>
>>> The tool is amazing and easy to use and understand most of the metrics
>>> ...
>>>
>>> Thinking if we need to run it in cluster mode and all the time, i think
>>> we can skip it as one or few runs can give you the large picture of how the
>>> job is running with different configuration and it's not too much
>>> complicated to run it using spark-submit.
>>>
>>> I think it will be so helpful if the sparklens can also include how the
>>> job is running with different configuration of cores and memory, Spark job
>>> with 1 exec and 1 core will run different from spark job with 1  exec and 3
>>> cores and for sure the same compare with different exec memory.
>>>
>>> Overall, it is so good starting point, but it will be a GAME CHANGER
>>> getting these metrics on the tool.
>>>
>>> @Rohit , Huge THANY YOU
>>>
>>> On Mon, Mar 26, 2018 at 1:35 PM, Rohit Karlupia 
>>> wrote:
>>>
 Hi Shmuel,

 In general it is hard to pin point to exact code which is responsible
 for a specific stage. For example when using spark sql, depending upon the
 kind of joins, aggregations used in the the single line of query, we will
 have multiple stages in the spark application. I usually try to split the
 code into smaller chunks and also use the spark UI which has special
 section for SQL. It can also show specific backtraces, but as I explained
 earlier they might not be very helpful. Sparklens does help you ask the
 right questions, but is not mature enough to answer all of them.

 Understanding the report:

 *1) The first part of total aggregate metrics for the application.*

 Printing application