Re: java.lang.UnsupportedOperationException: No Encoder found for Set[String]

2018-08-16 Thread Manu Zhang
You may try applying this PR  https://github.com/apache/spark/pull/18416.

On Fri, Aug 17, 2018 at 9:13 AM Venkat Dabri  wrote:

> We are using spark 2.2.0. Is it possible to bring the
> ExpressionEncoder from 2.3.0 and related classes into my code base and
> use them? I see the changes in ExpressionEncoder between 2.3.0 and
> 2.2.0 is not much but there might be many other classes underneath
> that might have changed.
>
> On Thu, Aug 16, 2018 at 5:23 AM, Manu Zhang 
> wrote:
> > Hi,
> >
> > It's added since Spark 2.3.0.
> >
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L180
> >
> > Regards,
> > Manu Zhang
> >
> > On Thu, Aug 16, 2018 at 9:59 AM V0lleyBallJunki3 
> > wrote:
> >>
> >> Hello,
> >>   I am using Spark 2.2.2 with Scala 2.11.8. I wrote a short program
> >>
> >> val spark = SparkSession.builder().master("local[4]").getOrCreate()
> >>
> >> case class TestCC(i: Int, ss: Set[String])
> >>
> >> import spark.implicits._
> >> import spark.sqlContext.implicits._
> >>
> >> val testCCDS = Seq(TestCC(1,Set("SS","Salil")), TestCC(2, Set("xx",
> >> "XYZ"))).toDS()
> >>
> >>
> >> I get :
> >> java.lang.UnsupportedOperationException: No Encoder found for
> Set[String]
> >> - field (class: "scala.collection.immutable.Set", name: "ss")
> >> - root class: "TestCC"
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:455)
> >>   at
> >>
> >>
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:455)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:626)
> >>   at
> >>
> >>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:614)
> >>   at
> >>
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> >>
> >> To the best of my knowledge implicit support for Set has been added in
> >> Spark
> >> 2.2. Am I missing something?
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: java.lang.UnsupportedOperationException: No Encoder found for Set[String]

2018-08-16 Thread Venkat Dabri
We are using spark 2.2.0. Is it possible to bring the
ExpressionEncoder from 2.3.0 and related classes into my code base and
use them? I see the changes in ExpressionEncoder between 2.3.0 and
2.2.0 is not much but there might be many other classes underneath
that might have changed.

On Thu, Aug 16, 2018 at 5:23 AM, Manu Zhang  wrote:
> Hi,
>
> It's added since Spark 2.3.0.
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L180
>
> Regards,
> Manu Zhang
>
> On Thu, Aug 16, 2018 at 9:59 AM V0lleyBallJunki3 
> wrote:
>>
>> Hello,
>>   I am using Spark 2.2.2 with Scala 2.11.8. I wrote a short program
>>
>> val spark = SparkSession.builder().master("local[4]").getOrCreate()
>>
>> case class TestCC(i: Int, ss: Set[String])
>>
>> import spark.implicits._
>> import spark.sqlContext.implicits._
>>
>> val testCCDS = Seq(TestCC(1,Set("SS","Salil")), TestCC(2, Set("xx",
>> "XYZ"))).toDS()
>>
>>
>> I get :
>> java.lang.UnsupportedOperationException: No Encoder found for Set[String]
>> - field (class: "scala.collection.immutable.Set", name: "ss")
>> - root class: "TestCC"
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:455)
>>   at
>>
>> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:455)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:626)
>>   at
>>
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:614)
>>   at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>>
>> To the best of my knowledge implicit support for Set has been added in
>> Spark
>> 2.2. Am I missing something?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pass config file through spark-submit

2018-08-16 Thread yujhe.li
So can you read the file on executor side?
I think the file passed by --files my.app.conf would be added under
classpath, and you can use it directly.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to see completed application in Spark 2 history web UI

2018-08-16 Thread Manu Zhang
Hi Fawze,

Sorry but I'm not familiar with CM. Maybe you can look into the logs (or
turn on DEBUG log).

On Thu, Aug 16, 2018 at 3:05 PM Fawze Abujaber  wrote:

> Hi Manu,
>
> I'm using cloudera manager with single user mode and every process is
> running with cloudera-scm user, the cloudera-scm is a super user and this
> is why i was confused how it worked in spark 1.6 and not in spark 2.3
>
>
> On Thu, Aug 16, 2018 at 5:34 AM Manu Zhang 
> wrote:
>
>> If you are able to log onto the node where UI has been launched, then try
>> `ps -aux | grep HistoryServer` and the first column of output should be the
>> user.
>>
>> On Wed, Aug 15, 2018 at 10:26 PM Fawze Abujaber 
>> wrote:
>>
>>> Thanks Manu, Do you know how i can see which user the UI is running,
>>> because i'm using cloudera manager and i created a user for cloudera
>>> manager and called it spark but this didn't solve me issue and here i'm
>>> trying to find out the user for the spark hisotry UI.
>>>
>>> On Wed, Aug 15, 2018 at 5:11 PM Manu Zhang 
>>> wrote:
>>>
 Hi Fawze,

 A) The file permission is currently hard coded to 770 (
 https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L287
 ).
 B) I think add all users (including UI) to the group like Spark will do.


 On Wed, Aug 15, 2018 at 6:38 PM Fawze Abujaber 
 wrote:

> Hi Manu,
>
> Thanks for your response.
>
> Yes, i see but still interesting to know how i can see these
> applications from the spark history UI.
>
> How i can know with which user i'm  logged in when i'm navigating the
> spark history UI.
>
> The Spark process is running with cloudera-scm and the events written
> in the spark2history folder at the HDFS written with the user name who is
> running the application and group spark (770 permissions).
>
> I'm interesting to see if i can force these logs to be written with
> 774 or 775 permission or finding another solutions that enable Rnd or
> anyone to be able to investigate his application logs using the UI.
>
> for example : can i use such spark conf :
> spark.eventLog.permissions=755
>
> The 2 options i see here:
>
> A) find a way to enforce these logs to be written with other
> permissions.
>
> B) Find the user that the UI running with as creating LDAP groups and
> user that can handle this.
>
> for example creating group called Spark and create the user that the
> UI running with and add this user to the spark group.
> not sure if this option will work as i don't know if these steps
> authenticate against the LDAP.
>

>>>
>>> --
>>> Take Care
>>> Fawze Abujaber
>>>
>>
>
> --
> Take Care
> Fawze Abujaber
>


something happened to MemoryStream after spark 2.3

2018-08-16 Thread Koert Kuipers
hi,
we just started testing internally with spark 2.4 snapshots, and it seems
our streaming tests are broken.

i believe it has to do with MemoryStream.

before we were able to create a MemoryStream, add data to it, convert it to
a streaming unbounded DataFrame and use it repeatedly. by using it
repeatedly i mean repeatedly doing: create a query (with a random uuid
name) from dataframe, process all available, stop the query. every time we
did this all the data in the MemoryStream would be processed.

now with spark 2.4.0-SNAPSHOT the second time we create a query no data is
processed at all. it is as if the MemoryStream is empty. it this expected?
should we refactor our tests?


Re: [K8S] Spark initContainer custom bootstrap support for Spark master

2018-08-16 Thread Li Gao
Thanks! We will likely use the second option to customize the bootstrap.

On Thu, Aug 16, 2018 at 10:04 AM Yinan Li  wrote:

> Yes, the init-container has been removed in the master branch. The
> init-container was used in 2.3.x only for downloading remote dependencies,
> which is now handled by running spark-submit in the driver. If you need to
> run custom bootstrap scripts using an init-container, the best option would
> be to use a mutating admission webhook to inject your init-container into
> the Spark pods. Another option is to create a custom image that runs the
> scripts prior to entering the entrypoint.
>
> Yinan
>
> On Wed, Aug 15, 2018 at 9:12 AM Li Gao  wrote:
>
>> Hi,
>>
>> We've noticed on the latest Master (not Spark 2.3.1 branch), the support
>> for Kubernetes initContainer is no longer there. What would be the path
>> forward if we need to do custom bootstrap actions (i.e. run additional
>> scripts) prior to driver/executor container entering running mode?
>>
>> Thanks,
>> Li
>>
>>


Re: [K8S] Spark initContainer custom bootstrap support for Spark master

2018-08-16 Thread Yinan Li
Yes, the init-container has been removed in the master branch. The
init-container was used in 2.3.x only for downloading remote dependencies,
which is now handled by running spark-submit in the driver. If you need to
run custom bootstrap scripts using an init-container, the best option would
be to use a mutating admission webhook to inject your init-container into
the Spark pods. Another option is to create a custom image that runs the
scripts prior to entering the entrypoint.

Yinan

On Wed, Aug 15, 2018 at 9:12 AM Li Gao  wrote:

> Hi,
>
> We've noticed on the latest Master (not Spark 2.3.1 branch), the support
> for Kubernetes initContainer is no longer there. What would be the path
> forward if we need to do custom bootstrap actions (i.e. run additional
> scripts) prior to driver/executor container entering running mode?
>
> Thanks,
> Li
>
>


Re: java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Vadim Semenov
one of the spills becomes bigger than 2GiB and can't be loaded fully
(as arrays in Java can't have more than 2^32 values)

> 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)


You can try increasing the number of partitions, so spills would be
further smaller.

Also check if you have some skewness on the stage that precedes the
stage where it fails on

On Thu, Aug 16, 2018 at 11:25 AM Deepak Sharma  wrote:
>
> Hi All,
> I am running spark based ETL in spark 1.6  and facing this weird issue.
> The same code with same properties/configuration runs fine in other 
> environment E.g. PROD but never completes in CAT.
> The only change would be the size of data it is processing and that too be by 
> 1-2 GB.
> This is the stack trace:java.lang.IndexOutOfBoundsException: len is negative
> at org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:895)
> at 
> org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:509)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
> at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoin.scala:272)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoin.scala:233)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeOuterJoin.scala:250)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeOuterJoin.scala:283)
> at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> Did anyone faced this issue?
> If yes , what can i do to resolve this?
>
> Thanks
> Deepak



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Deepak Sharma
Hi All,
I am running spark based ETL in spark 1.6  and facing this weird issue.
The same code with same properties/configuration runs fine in other
environment E.g. PROD but never completes in CAT.
The only change would be the size of data it is processing and that too be
by 1-2 GB.
This is the stack trace:java.lang.IndexOutOfBoundsException: len is negative
at org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:895)
at
org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:509)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
at
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoin.scala:272)
at
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoin.scala:233)
at
org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeOuterJoin.scala:250)
at
org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeOuterJoin.scala:283)
at
org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Did anyone faced this issue?
If yes , what can i do to resolve this?

Thanks
Deepak


[Spark Streaming] [ML]: Exception handling for the transform method of Spark ML pipeline model

2018-08-16 Thread sudododo
Hi,

I'm implementing a Spark Streaming + ML application. The data is coming in a
Kafka topic as json format. The Spark Kafka connector reads the data from
the Kafka topic as DStream. After several preprocessing steps, the input
DStream is transformed to a feature DStream which is fed into Spark ML
pipeline model. The code sample explains how the feature DStream interacts
with the pipeline model.

prediction_stream = feature_stream.transform(lambda rdd: predict_rdd(rdd,
pipeline_model)

def predict_rdd(rdd, pipeline_model):
if(rdd is not None) and (not rdd.isEmpty()):
try:
df = rdd.toDF()
predictions = pipeline_model.transform(df)
return predictions.rdd
except Exception as e:
print("Unable to make predictions")
return None
 else:
  return None

Here comes the problem. If the pipeline_model.transform(df) is failed due to
some data issues in some rows of df, the try...except block won't be able to
catch the exception since the exception is thrown in executors. As a result,
the exception is bubbled up to Spark and the streaming application is
terminated.

I want the exception to be caught in some way that the streaming application
won't be terminated and keep processing incoming data. Is it possible?

I know one solution could be doing more thorough data validation in
preprocessing step. However some sort of error handling should be put in
place for the transform method of pipeline model just in case any unexpected
things happen.


Thanks,



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pass config file through spark-submit

2018-08-16 Thread James Starks
I have a config file that exploits type safe config library located on the 
local file system, and want to submit that file through spark-submit so that 
spark program can read customized parameters. For instance,

my.app {
  db {
host = domain.cc
port = 1234
db = dbname
user = myuser
passwd = mypass
  }
}

Spark submit code looks like

spark-submit --class "my.app.Sample" --master local[*] --conf 
"spark.executor.extraJavaOptions=-Dconfig.file=/path/to/conf/myapp.conf" 
/path/to/my-app.jar

But the program can not read the parameters such as db, user, host, and so on 
in my conf file.

Passing with --files /path/to/myapp.conf doesn't work either.

What is the correct way to submit that kind of conf file so that my spark job 
can read customized parameters from there?

Thanks

Re: java.lang.UnsupportedOperationException: No Encoder found for Set[String]

2018-08-16 Thread Manu Zhang
Hi,

It's added since Spark 2.3.0.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L180

Regards,
Manu Zhang

On Thu, Aug 16, 2018 at 9:59 AM V0lleyBallJunki3 
wrote:

> Hello,
>   I am using Spark 2.2.2 with Scala 2.11.8. I wrote a short program
>
> val spark = SparkSession.builder().master("local[4]").getOrCreate()
>
> case class TestCC(i: Int, ss: Set[String])
>
> import spark.implicits._
> import spark.sqlContext.implicits._
>
> val testCCDS = Seq(TestCC(1,Set("SS","Salil")), TestCC(2, Set("xx",
> "XYZ"))).toDS()
>
>
> I get :
> java.lang.UnsupportedOperationException: No Encoder found for Set[String]
> - field (class: "scala.collection.immutable.Set", name: "ss")
> - root class: "TestCC"
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:455)
>   at
>
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:809)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:455)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:626)
>   at
>
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$10.apply(ScalaReflection.scala:614)
>   at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>
> To the best of my knowledge implicit support for Set has been added in
> Spark
> 2.2. Am I missing something?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-16 Thread purna pradeep
Hello,

im running Spark 2.3 job on kubernetes cluster
>
> kubectl version
>
> Client Version: version.Info{Major:"1", Minor:"9",
> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b",
> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z",
> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"}
>
> Server Version: version.Info{Major:"1", Minor:"8",
> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd",
> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z",
> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"}
>
>
>
> when i ran spark submit on k8s master the driver pod is stuck in Waiting:
> PodInitializing state.
> I had to manually kill the driver pod and submit new job in this case
> ,then it works.How this can be handled in production ?
>

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128


>
> This is happening if i submit the jobs almost parallel ie submit 5 jobs
> one after the other simultaneously.
>
> I'm running spark jobs on 20 nodes each having below configuration
>
> I tried kubectl describe node on the node where trhe driver pod is running
> this is what i got ,i do see there is overcommit on resources but i
> expected kubernetes scheduler not to schedule if resources in node are
> overcommitted or node is in Not Ready state ,in this case node is in Ready
> State but i observe same behaviour if node is in "Not Ready" state
>
>
>
> Name:   **
>
> Roles:  worker
>
> Labels: beta.kubernetes.io/arch=amd64
>
> beta.kubernetes.io/os=linux
>
> kubernetes.io/hostname=
>
> node-role.kubernetes.io/worker=true
>
> Annotations:node.alpha.kubernetes.io/ttl=0
>
>
> volumes.kubernetes.io/controller-managed-attach-detach=true
>
> Taints: 
>
> CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400
>
> Conditions:
>
>   Type Status  LastHeartbeatTime
> LastTransitionTimeReason   Message
>
>    --  -
> ----   ---
>
>   OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
> sufficient disk space available
>
>   MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
> sufficient memory available
>
>   DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
> pressure
>
>   ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
> Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
> ready status. AppArmor enabled
>
> Addresses:
>
>   InternalIP:  *
>
>   Hostname:**
>
> Capacity:
>
>  cpu: 16
>
>  memory:  125827288Ki
>
>  pods:110
>
> Allocatable:
>
>  cpu: 16
>
>  memory:  125724888Ki
>
>  pods:110
>
> System Info:
>
>  Machine ID: *
>
>  System UUID:**
>
>  Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f
>
>  Kernel Version: 4.4.0-1062-aws
>
>  OS Image:   Ubuntu 16.04.4 LTS
>
>  Operating System:   linux
>
>  Architecture:   amd64
>
>  Container Runtime Version:  docker://Unknown
>
>  Kubelet Version:v1.8.3
>
>  Kube-Proxy Version: v1.8.3
>
> PodCIDR: **
>
> ExternalID:  **
>
> Non-terminated Pods: (11 in total)
>
>   Namespace  Name
>CPU Requests  CPU Limits  Memory Requests  Memory
> Limits
>
>   -  
>  --  ---
>  -
>
>   kube-systemcalico-node-gj5mb
>   250m (1%) 0 (0%)  0 (0%)   0 (0%)
>
>   kube-system
>  kube-proxy- 100m (0%)
> 0 (0%)  0 (0%)   0 (0%)
>
>   kube-systemprometheus-prometheus-node-exporter-9cntq
>   100m (0%) 200m (1%)   30Mi (0%)50Mi (0%)
>
>   logging
>  elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
> 1 (6%)  8Gi (6%) 16Gi (13%)
>
>   loggingfluentd-fluentd-elasticsearch-tj7nd
>   200m (1%) 0 (0%)  612Mi (0%)   0 (0%)
>
>   rook   rook-agent-6jtzm
>0 (0%)0 (0%)  0 (0%)   0 (0%)

Re: Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

2018-08-16 Thread andreas . weise



On 2018/04/17 22:34:25, Cody Koeninger  wrote: 
> Is this possibly related to the recent post on
> https://issues.apache.org/jira/browse/SPARK-18057 ?
> 
> On Mon, Apr 16, 2018 at 11:57 AM, ARAVIND SETHURATHNAM <
> asethurath...@homeaway.com.invalid> wrote:
> 
> > Hi,
> >
> > We have several structured streaming jobs (spark version 2.2.0) consuming
> > from kafka and writing to s3. They were running fine for a month, since
> > yesterday few jobs started failing and I see the below exception in the
> > failed jobs  log,
> >
> >
> >
> > ```Tried to fetch 473151075 but the returned record offset was 473151072```
> > ```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed
> > in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0
> > failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96,
> > ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException:
> > Tried to fetch 473151075 but the returned record offset was 473151072
> > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(
> > CachedKafkaConsumer.scala:234)
> > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> > CachedKafkaConsumer.scala:106)
> > at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> > getNext(KafkaSourceRDD.scala:158)
> > at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> > getNext(KafkaSourceRDD.scala:149)
> > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> > GeneratedIterator.processNext(Unknown Source)
> > at org.apache.spark.sql.execution.BufferedRowIterator.
> > hasNext(BufferedRowIterator.java:43)
> > at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> > at scala.collection.convert.Wrappers$IteratorWrapper.
> > hasNext(Wrappers.scala:30)
> > `
> >
> >
> >
> > can someone provide some direction what could be causing this all of a
> > sudden when consuming from those topics?
> >
> >
> >
> > regards
> >
> > [image: https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
> >
> > Aravind
> >
> >
> >
> 

Facing the same issue here with Spark 2.3.1 and Kafka Client 0.10.2.1 
(--packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,org.apache.kafka:kafka-clients:0.10.2.1).

Spark Code looks interesting here (seems some corner case is not considered yet 
in spark)
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L295
  } else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably 
misunderstand Kafka internal
// mechanism.
throw new IllegalStateException(
  s"Tried to fetch $offset but the returned record offset was 
${record.offset}")
  }

Two interesting point in our case:

1) We started to consume after a Kafka Disaster where the whole cluster went 
down (maybe kafka has skipped corrupted messages/offsets?).

2) After recovery our structured streaming jobs were using specific 
startingOffsets option:
startingOffsets={"our_topic":{"0":58120761,"1":56140499,"2":57353702}}

Then we see two exceptions mentioning two different offsets (57354415 and 
58121595), so it seems the Consumer did not stopped immediately, but short 
afterwards.

Exceptions:
2018-08-16 13:15:18 WARN  TaskSetManager:66 - Lost task 1.0 in stage 22.0 (TID 
269, sparkslave-sv-02.avm.de, executor 0): java.lang.IllegalStateException: 
Tried to fetch 57354416 but the returned record offset was 57354415
at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
at 

Re: Unable to see completed application in Spark 2 history web UI

2018-08-16 Thread Fawze Abujaber
Hi Manu,

I'm using cloudera manager with single user mode and every process is
running with cloudera-scm user, the cloudera-scm is a super user and this
is why i was confused how it worked in spark 1.6 and not in spark 2.3


On Thu, Aug 16, 2018 at 5:34 AM Manu Zhang  wrote:

> If you are able to log onto the node where UI has been launched, then try
> `ps -aux | grep HistoryServer` and the first column of output should be the
> user.
>
> On Wed, Aug 15, 2018 at 10:26 PM Fawze Abujaber  wrote:
>
>> Thanks Manu, Do you know how i can see which user the UI is running,
>> because i'm using cloudera manager and i created a user for cloudera
>> manager and called it spark but this didn't solve me issue and here i'm
>> trying to find out the user for the spark hisotry UI.
>>
>> On Wed, Aug 15, 2018 at 5:11 PM Manu Zhang 
>> wrote:
>>
>>> Hi Fawze,
>>>
>>> A) The file permission is currently hard coded to 770 (
>>> https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L287
>>> ).
>>> B) I think add all users (including UI) to the group like Spark will do.
>>>
>>>
>>> On Wed, Aug 15, 2018 at 6:38 PM Fawze Abujaber 
>>> wrote:
>>>
 Hi Manu,

 Thanks for your response.

 Yes, i see but still interesting to know how i can see these
 applications from the spark history UI.

 How i can know with which user i'm  logged in when i'm navigating the
 spark history UI.

 The Spark process is running with cloudera-scm and the events written
 in the spark2history folder at the HDFS written with the user name who is
 running the application and group spark (770 permissions).

 I'm interesting to see if i can force these logs to be written with 774
 or 775 permission or finding another solutions that enable Rnd or anyone to
 be able to investigate his application logs using the UI.

 for example : can i use such spark conf :
 spark.eventLog.permissions=755

 The 2 options i see here:

 A) find a way to enforce these logs to be written with other
 permissions.

 B) Find the user that the UI running with as creating LDAP groups and
 user that can handle this.

 for example creating group called Spark and create the user that the UI
 running with and add this user to the spark group.
 not sure if this option will work as i don't know if these steps
 authenticate against the LDAP.

>>>
>>
>> --
>> Take Care
>> Fawze Abujaber
>>
>

-- 
Take Care
Fawze Abujaber


Re: from_json function

2018-08-16 Thread dbolshak
Maxim, thanks for your replay.

I've left comment in the following jira issue
https://issues.apache.org/jira/browse/SPARK-23194?focusedCommentId=16582025=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16582025



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org