Re: how "hour" function in Spark SQL is supposed to work?

2018-03-11 Thread vermanurag
Not sure why you are dividing by 1000. from_unixtime expects a long type
which is time in milliseconds from reference date.

The following should work:

val ds = dataset.withColumn("hour",hour(from_unixtime(dataset.col("ts"






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



The last successful batch before stop re-execute after restart the DStreams with checkpoint

2018-03-11 Thread Terry Hoo
Experts,

I see the last batch before stop (graceful shutdown) always re-execute
after restart the DStream from a checkpoint, is this a expected behavior?

I see a bug in JIRA: https://issues.apache.org/jira/browse/SPARK-20050,
whic reports duplicates on Kafka, I also see this with HDFS file.

Regards
- Terry


spark sql get result time larger than compute Duration

2018-03-11 Thread wkhapy_1
get result 1.67s
 

compute cost 0.2s
 

below is sql select event_date, dim ,concat_ws('|',collect_list(result))
result from ( select event_day event_date , '' dim
,concat_ws(',',result,event) result from ( select event_day
,event,count(uid) result from (select uid,event_day ,event ,uid from
usereventattr1 where ( city ='a' ) and ( event='WENJUANWANG__SUBMIT' ) )
usereventattrchild group by event ,event_day union all select event_day
,event,count(uid) result from (select uid,event_day ,event ,uid from
usereventattr1 where ( city ='a' ) and ( event='WECHAT__SUBSCRIBE' ) )
usereventattrchild group by event ,event_day ) xx) ab group by
dim,event_date

explain get result also cost 1.4s
 
explain get result also cost 1.4s

anybody kown why get result time large than compute 1.4s



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.3 submit on Kubernetes error

2018-03-11 Thread Yinan Li
Spark on Kubernetes requires the presence of the kube-dns add-on properly
configured. The executors connect to the driver through a headless
Kubernetes service using the DNS name of the service. Can you check if you
have the add-on installed in your cluster? This issue
https://github.com/apache-spark-on-k8s/spark/issues/558 might help.


On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep 
wrote:

> Getting below errors when I’m trying to run spark-submit on k8 cluster
>
>
> *Error 1*:This looks like a warning it doesn’t interrupt the app running
> inside executor pod but keeps on getting this warning
>
>
> *2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
> *java.io.EOFException*
> *   at okio.RealBufferedSource.require(RealBufferedSource.java:60)*
> *   at
> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
> *   at okhttp3.internal.ws
> .WebSocketReader.readHeader(WebSocketReader.java:113)*
> *   at okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:97)*
> *   at okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:262)*
> *   at okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:201)*
> *   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
> *   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
> *   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
> *   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
> *   at java.lang.Thread.run(Thread.java:748)*
>
>
>
> *Error2:* This is intermittent error  which is failing the executor pod
> to run
>
>
> *org.apache.spark.SparkException: External scheduler cannot be
> instantiated*
> * at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
> * at org.apache.spark.SparkContext.(SparkContext.scala:492)*
> * at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
> * at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
> * at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
> * at scala.Option.getOrElse(Option.scala:121)*
> * at
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
> * at
> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
> * at
> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
> * at
> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
> * at
> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
> * at
> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Operation: [get]  for kind: [Pod]  with name:
> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
> [default]  failed.*
> * at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
> * at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
> * at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
> * at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
> * at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
> * at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
> * at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)*
> * ... 11 more*
> *Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try
> again*
> * at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)*
> * at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)*
> * at
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)*
> * at java.net.InetAddress.getAllByName0(InetAddress.java:1276)*
> * at java.net.InetAddress.getAllByName(InetAddress.java:1192)*
> * at java.net.InetAddress.getAllByName(InetAddress.java:1126)*
> * at okhttp3.Dns$1.lookup(Dns.java:39)*
> * at
> okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)*
> * at
> okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)*
> * at
> okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)*
> * at
> 

Spark 2.3 submit on Kubernetes error

2018-03-11 Thread purna pradeep
Getting below errors when I’m trying to run spark-submit on k8 cluster


*Error 1*:This looks like a warning it doesn’t interrupt the app running
inside executor pod but keeps on getting this warning


*2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
*java.io.EOFException*
*   at okio.RealBufferedSource.require(RealBufferedSource.java:60)*
*   at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
*   at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)*
*   at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)*
*   at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)*
*   at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)*
*   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
*   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
*   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
*   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
*   at java.lang.Thread.run(Thread.java:748)*



*Error2:* This is intermittent error  which is failing the executor pod to
run


*org.apache.spark.SparkException: External scheduler cannot be
instantiated*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
* at org.apache.spark.SparkContext.(SparkContext.scala:492)*
* at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
* at scala.Option.getOrElse(Option.scala:121)*
* at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
* at
com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
*Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
[default]  failed.*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)*
* ... 11 more*
*Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try
again*
* at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)*
* at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)*
* at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)*
* at java.net.InetAddress.getAllByName0(InetAddress.java:1276)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1192)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1126)*
* at okhttp3.Dns$1.lookup(Dns.java:39)*
* at
okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)*
* at
okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)*
* at
okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)*
* at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)*
* at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)*
* at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)*
* at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)*
* at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* 

Debugging a local spark executor in pycharm

2018-03-11 Thread Vitaliy Pisarev
I want to step through the work of a spark executor running locally on my
machine, from Pycharm.

I am running explicit functionality, in the form of
dataset.foreachPartition(f) and I want to see what is going on inside f.

Is there a straightforward way to do it or do I need to resort to remote
debugging?
p.s

Posted this on SO

as well.


how "hour" function in Spark SQL is supposed to work?

2018-03-11 Thread Serega Sheypak
hi, desperately trying to extract hour from unix seconds

year, month, dayofmonth functions work as expected.
hour function always returns 0.

val ds  = dataset
  .withColumn("year", year(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("month", month(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("day",
dayofmonth(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("hour", hour(from_utc_timestamp(dataset.col("ts") / 1000, "UTC")))

  //.withColumn("hour", hour(dataset.col("ts") / 1000))
  //.withColumn("hour1", hour(dataset.col("ts")))
  //.withColumn("hour", hour(dataset.col("ts")))
  //.withColumn("hour", hour("2009-07-30 12:58:59"))

I took a look at source code

year, month, dayofmonth expect to get

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

hour function expects something different

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)

from_utc_timestamp returns Timestamp

override def dataType: DataType = TimestampType

but It didn't help

What do I do wrong? how can I get hour from unix seconds?
Thanks!


Error running multinomial regression on a dataset with a field having constant value

2018-03-11 Thread kundan kumar
I am running the sample multinomial regression code given in spark docs
(Version 2.2.0)


LogisticRegression lr = new
LogisticRegression().setMaxIter(100).setRegParam(0.3).setElasticNetParam(0.8);
LogisticRegressionModel lrModel = lr.fit(training);

But in the dataset I am adding a constant field where all the values are
same.

Now, I get an error saying

2018-03-11 15:42:58,835 [main] ERROR OWLQN  - Failure! Resetting history:
breeze.optimize.NaNHistory:
2018-03-11 15:42:58,922 [main] INFO  OWLQN  - Step Size: 1.000
2018-03-11 15:42:58,938 [main] INFO  OWLQN  - Val and Grad Norm: NaN (rel:
NaN) NaN
2018-03-11 15:42:58,940 [main] INFO  OWLQN  - Converged because max
iterations reached


Without the constant field in the dataset everything works fine.

Please help me understand what is the reason behind this error. When I run
a binary logistic regression code it runs fine even if there are constant
values in a field.

Do I really need to get rod of constant field from my dataset while running
multinomial regression.

Is it a bug or this is expected ??


Thanks !!
Kundan