Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread amihay gonen
If you are using kafka direct connect api it might be committing offset
back to kafka itself

בתאריך יום ה׳, 7 ביוני 2018, 4:10, מאת licl ‏:

> I met the same issue and I have try to delete the checkpoint dir before the
> job ,
>
> But spark seems can read the correct offset  even though after the
> checkpoint dir is deleted ,
>
> I don't know how spark do this without checkpoint's metadata.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


If there is timestamp type data in DF, Spark 2.3 toPandas is much slower than spark 2.2.

2018-06-06 Thread 李斌松
If there is timestamp type data in DF, Spark 2.3 toPandas is much slower
than spark 2.2.


Pyspark Join and then column select is showing unexpected output

2018-06-06 Thread bis_g
I am not sure if the long work is doing this to me but I am seeing some
unexpected behavior in spark 2.2.0

I have created a toy example as below

toy_df = spark.createDataFrame([
['p1','a'],
['p1','b'],
['p1','c'],
['p2','a'],
['p2','b'],
['p2','d']],schema=['patient','drug']) 
I create another dataframe

mdf = toy_df.filter(toy_df.drug == 'c')
as you know mdf would be

 mdf.show()
+---++
|patient|drug|
+---++
| p1|   c|
+---++ 
Now If I do this

toy_df.join(mdf,["patient"],"left").select(toy_df.patient.alias("P1"),toy_df.drug.alias('D1'),mdf.patient,mdf.drug).show()
Surprisingly I get

+---+---+---++
| P1| D1|patient|drug|
+---+---+---++
| p2|  a| p2|   a|
| p2|  b| p2|   b|
| p2|  d| p2|   d|
| p1|  a| p1|   a|
| p1|  b| p1|   b|
| p1|  c| p1|   c|
+---+---+---++
but if I use

toy_df.join(mdf,["patient"],"left").show()
I do see the expected behavior

 patient|drug|drug|
+---+++
| p2|   a|null|
| p2|   b|null|
| p2|   d|null|
| p1|   a|   c|
| p1|   b|   c|
| p1|   c|   c|
+---+++
and if I use an alias expression on one of the dataframes I do get the
expected behavior

toy_df.join(mdf.alias('D'),on=["patient"],how="left").select(toy_df.patient.alias("P1"),toy_df.drug.alias("D1"),'D.drug').show()

| P1| D1|drug|
+---+---++
| p2|  a|null|
| p2|  b|null|
| p2|  d|null|
| p1|  a|   c|
| p1|  b|   c|
| p1|  c|   c|
+---+---++
So my question is what is the best way to select columns after join and is
this behavior normal



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark Structured Streaming - Kafka Streaming - Option to ignore checkpoint

2018-06-06 Thread licl
I met the same issue and I have try to delete the checkpoint dir before the
job ,

But spark seems can read the correct offset  even though after the
checkpoint dir is deleted ,

I don't know how spark do this without checkpoint's metadata.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark ML online serving

2018-06-06 Thread Holden Karau
At Spark Summit some folks were talking about model serving and we wanted
to collect requirements from the community.
-- 
Twitter: https://twitter.com/holdenkarau


Re: Hive to Oracle using Spark - Type(Date) conversion issue

2018-06-06 Thread spark receiver
Use unix time and write the unix time to oracle as number column type ,create 
virtual column in oracle database for the unix time  like “oracle_time 
generated always as (to_date('1970010108','MMDDHH24')+(1/24/60/60)*unixtime 
)

> On Mar 20, 2018, at 11:08 PM, Gurusamy Thirupathy  wrote:
> 
> HI Jorn,
> 
> Thanks for your sharing different options, yes we are trying to build a 
> generic tool for Hive to Spark export. 
> FYI, currently we are using sqoop, we are trying to migrate from sqoop to 
> spark.
> 
> Thanks
> -G
> 
> On Tue, Mar 20, 2018 at 2:17 AM, Jörn Franke  > wrote:
> Write your own Spark UDF. Apply it to all varchar columns.
> 
> Within this udf you can use the SimpleDateFormat parse method. If this method 
> returns null you return the content as varchar if not you return a date. If 
> the content is null you return null.
> 
> Alternatively you can define an insert function as pl/sql on Oracle side.
> 
> Another alternative is to read the Oracle metadata for the table at runtime 
> and then adapt your conversion based on this. 
> 
> However, this may not be perfect depending on your use case. Can you please 
> provide more details/examples? Do you aim at a generic hive to Oracle import 
> tool using Spark? Sqoop would not be an alternative?
> 
> On 20. Mar 2018, at 03:45, Gurusamy Thirupathy  > wrote:
> 
>> Hi guha,
>> 
>> Thanks for your quick response, option a and b are in our table already. For 
>> option b, again the same problem, we don't know which column is date.
>> 
>> 
>> Thanks,
>> -G
>> 
>> On Sun, Mar 18, 2018 at 9:36 PM, Deepak Sharma > > wrote:
>> The other approach would to write to temp table and then merge the data.
>> But this may be expensive solution.
>> 
>> Thanks
>> Deepak
>> 
>> On Mon, Mar 19, 2018, 08:04 Gurusamy Thirupathy > > wrote:
>> Hi,
>> 
>> I am trying to read data from Hive as DataFrame, then trying to write the DF 
>> into the Oracle data base. In this case, the date field/column in hive is 
>> with Type Varchar(20)
>> but the corresponding column type in Oracle is Date. While reading from hive 
>> , the hive table names are dynamically decided(read from another table) 
>> based on some job condition(ex. Job1). There are multiple tables like this, 
>> so column and the table names are decided only run time. So I can't do type 
>> conversion explicitly when read from Hive.
>> 
>> So is there any utility/api available in Spark to achieve this conversion 
>> issue?
>> 
>> 
>> Thanks,
>> Guru
>> 
>> 
>> 
>> -- 
>> Thanks,
>> Guru
> 
> 
> 
> -- 
> Thanks,
> Guru



Re: Dataframe from 1.5G json (non JSONL)

2018-06-06 Thread raksja
Its happenning in the executor
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 25800"...



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



FINAL REMINDER: Apache EU Roadshow 2018 in Berlin next week!

2018-06-06 Thread sharan

Hello Apache Supporters and Enthusiasts

This is a final reminder that our Apache EU Roadshow will be held in 
Berlin next week on 13th and 14th June 2018. We will have 28 different 
sessions running over 2 days that cover some great topics. So if you are 
interested in Microservices, Internet of Things (IoT), Cloud, Apache 
Tomcat or Apache Http Server then we have something for you.


https://foss-backstage.de/sessions/apache-roadshow

We will be co-located with FOSS Backstage, so if you are interested in 
topics such as incubator, the Apache Way, open source governance, legal, 
trademarks or simply open source communities then there will be 
something there for you too.  You can attend any of talks, presentations 
and workshops from the Apache EU Roadshow or FOSS Backstage.


You can find details of the combined Apache EU Roadshow and FOSS 
Backstage conference schedule below:


https://foss-backstage.de/schedule?day=2018-06-13

Ticket prices go up on 8th June 2018 and we have a last minute discount 
code that anyone can use before the deadline:


15% discount code: ASF15_discount
valid until June 7, 23:55 CET

You can register at the following link:

https://foss-backstage.de/tickets

Our Apache booth and lounge will be open from 11th - 14th June for 
meetups, hacking or to simply relax between sessions. And we will be 
posting regular updates on social media throughout next week so please 
follow us on Twitter @ApacheCon


Thank you for your continued support and we look forward to seeing you 
in Berlin!


Thanks
Sharan Foga, VP Apache Community Development

http://apachecon.com/

PLEASE NOTE: You are receiving this message because you are subscribed 
to a user@ or dev@ list of one or more Apache Software Foundation projects.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SparkLauncher] stateChanged event not received in standalone cluster mode

2018-06-06 Thread Marcelo Vanzin
That feature has not been implemented yet.
https://issues.apache.org/jira/browse/SPARK-11033

On Wed, Jun 6, 2018 at 5:18 AM, Behroz Sikander  wrote:
> I have a client application which launches multiple jobs in Spark Cluster
> using SparkLauncher. I am using Standalone cluster mode. Launching jobs
> works fine till now. I use launcher.startApplication() to launch.
>
> But now, I have a requirement to check the states of my Driver process. I
> added a Listener implementing the SparkAppHandle.Listener but I don't get
> any events. I am following the approach mentioned here
> https://www.linkedin.com/pulse/spark-launcher-amol-kale
>
> I tried the same code with client code and I receive all the events as
> expected.
>
> So, I am guessing that something different needs to be done in cluster mode.
> Is there any example with cluster mode?
>
> Regards,
> Behroz



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SparkLauncher] stateChanged event not received in standalone cluster mode

2018-06-06 Thread bsikander
Any help would be appreciated.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [External] Re: Sorting in Spark on multiple partitions

2018-06-06 Thread Sing, Jasbir
Hi Jorn,

We are using Spark 2.2.0 for our development.
Below is the code snippet for your reference:

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
newDf.write.format("parquet").saveAsTable("tempData")
newDf.coalesce(1).write.format(outputFormat).option("header", 
"true").save(hdfsUri + destFilePath)

var groupedData = newDf.rdd.map { x => (x.get(0),x)}.groupByKey();

 //Get schema fields of dataframe
 var structFieldArray:Array[StructField] = newDf.schema.fields
 //Create Map for storing Dataframe's columnName,ColumnNumber and their 
dataType
 var i=0
 val cache = collection.mutable.Map[String, DataFrameBO]()
 for(structField<-structFieldArray)
 {
  val dataFrameBO = new 
DataFrameBO(i,structField.name,structField.dataType.typeName)
  cache.put(structField.name, dataFrameBO)
  i = i + 1
 }
 var dfWithoutDuplicateRows = groupedData.mapValues { x => {
  var ls:List[Row]=List()
  var linkedMap = collection.mutable.Map[String, String]()
  val linkedSid =ArrayBuffer.empty[String]


  x.foreach { y => {
 var subpathId = y(cache(sid).columnNumber)
 var salesTimeColumn = y(cache(time).columnNumber)
 var orderId = 
y(cache(orderIdColumnName).columnNumber)

 var seq = ArrayBuffer[Any]()
 for(i <- 0 to (y.size - 2)){
  seq += y(i)
 }

  
if(!linkedSid.contains(y(cache(sid).columnNumber)))
  {
  
if(linkedMap.exists(x => x._1.equals(y(cache(time).columnNumber)) && 
x._2.equals(y(cache(orderIdColumnName).columnNumber
  {
 seq += 0 // Appends 0 to rows which needs 
to be deleted.
  }
  else
  {
  linkedSid += 
y(cache(sid).columnNumber).toString()
  
linkedMap.put(y(cache(time).columnNumber).toString(),y(cache(orderIdColumnName).columnNumber).toString())
  seq += 1  // Appends 1 to rows which need 
not be deleted.
  }
   }
  else
  {
   seq += 0 // Appends 0 to rows which needs to 
be deleted.
  }

 ls::= Row.fromSeq(seq)
  }}
 ls
  }}

  var flatDataframe = dfWithoutDuplicateRows.values.flatMap { x => {x} }
  var finalDF = data.sqlContext.createDataFrame(flatDataframe, newDf.schema)

finalDF should have picked up data on first cum first basis and updated the 
flag accordingly.
Please let me know if you need any other information regarding the same.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:59 PM
To: Jain, Neha T. mailto:neha.t.j...@accenture.com>>
Cc: user@spark.apache.org; Patel, Payal 
mailto:payal.pa...@accenture.com>>; Sing, Jasbir 
mailto:jasbir.s...@accenture.com>>
Subject: Re: [External] Re: Sorting in Spark on multiple partitions

I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.

That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

On 4. Jun 2018, at 17:33, Jain, Neha T. 
mailto:neha.t.j...@accenture.com>> wrote:
Hi Jorn,

I tried removing userid from my sort clause but still the same issue- data not 
sorted.

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)

I am checking the sorting results  by temporary writing this file to Hive as 
well as HDFS. Now, when I see the user wise data it is not sorted.
Attaching the output file for your reference.

On the basis of sorting within userid partitions, I want to add a flag which 
marks first item in the partition as true other items in that partition as 
false.
If my sorting order is disturbed, the flag is wrongly set.

Please suggest what else could be done to fix this very basic scenario of 
sorting in Spark across multiple partitions across multiple nodes.

Thanks & Regards,
Neha Jain

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:48 AM
To: Sing, Jasbir mailto:jasbir.s...@accenture.com>>
Cc: user@spark.apache.org; Patel, Payal 
mailto:payal.pa...@accenture.com>>; Jain, Neha T. 

Re: Reg:- Py4JError in Windows 10 with Spark

2018-06-06 Thread Jay
Are you running this in local mode or cluster mode ? If you are running in
cluster mode have you ensured that numpy is present on all nodes ?

On Tue 5 Jun, 2018, 2:43 AM @Nandan@, 
wrote:

> Hi ,
> I am getting error :-
>
> ---
> Py4JError Traceback (most recent call last)
>  in ()
> 3 TOTAL = 100
> 4 dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(
> TOTAL)]).cache()
> > 5 print("Number of random points:", dots.count())
> 6
> 7 stats = dots.stats()
> C:\opt\spark\python\pyspark\rdd.py in count(self)
> 1039 3
> 1040 """
> -> 1041 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 1042
> 1043 def stats(self):
> C:\opt\spark\python\pyspark\rdd.py in sum(self)
> 1030 6.0
> 1031 """
> -> 1032 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add
> )
> 1033
> 1034 def count(self):
> C:\opt\spark\python\pyspark\rdd.py in fold(self, zeroValue, op)
> 904 # zeroValue provided to each partition is unique from the one provided
> 905 # to the final reduce call
> --> 906 vals = self.mapPartitions(func).collect()
> 907 return reduce(op, vals, zeroValue)
> 908
> C:\opt\spark\python\pyspark\rdd.py in collect(self)
> 807 """
> 808 with SCCallSiteSync(self.context) as css:
> --> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
> 810 return list(_load_from_socket(port, self._jrdd_deserializer))
> 811
> C:\opt\spark\python\pyspark\rdd.py in _jrdd(self)
> 2453
> 2454 wrapped_func = _wrap_function(self.ctx, self.func,
> self._prev_jrdd_deserializer,
> -> 2455 self._jrdd_deserializer, profiler)
> 2456 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
> wrapped_func,
> 2457 self.preservesPartitioning)
> C:\opt\spark\python\pyspark\rdd.py in _wrap_function(sc, func,
> deserializer, serializer, profiler)
> 2388 pickled_command, broadcast_vars, env, includes =
> _prepare_for_python_RDD(sc, command)
> 2389 return sc._jvm.PythonFunction(bytearray(pickled_command), env,
> includes, sc.pythonExec,
> -> 2390 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
> 2391
> 2392
> C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py in
> __call__(self, *args)
> 1426 answer = self._gateway_client.send_command(command)
> 1427 return_value = get_return_value(
> -> 1428 answer, self._gateway_client, None, self._fqn)
> 1429
> 1430 for temp_arg in temp_args:
> C:\opt\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
> 61 def deco(*a, **kw):
> 62 try:
> ---> 63 return f(*a, **kw)
> 64 except py4j.protocol.Py4JJavaError as e:
> 65 s = e.java_exception.toString()
> C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py in
> get_return_value(answer, gateway_client, target_id, name)
> 322 raise Py4JError(
> 323 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
> --> 324 format(target_id, ".", name, value))
> 325 else:
> 326 raise Py4JError(
> Py4JError: An error occurred while calling
> None.org.apache.spark.api.python.PythonFunction. Trace:
> py4j.Py4JException: Constructor
> org.apache.spark.api.python.PythonFunction([class [B, class
> java.util.HashMap, class java.util.ArrayList, class java.lang.String, class
> java.lang.String, class java.util.ArrayList, class
> org.apache.spark.api.python.PythonAccumulatorV2]) does not exist
> at
> py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
> at
> py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
> at py4j.Gateway.invoke(Gateway.java:235)
> at
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I installed Spark 2.0.2 on windows 10 and code is as below:-
>
>> sc = SparkContext.getOrCreate()
>> sc
>> import numpy as np
>> TOTAL = 100
>> dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in
>> range(TOTAL)]).cache()
>> print("Number of random points:", dots.count())
>> stats = dots.stats()
>> print('Mean:', stats.mean())
>> print('stdev:', stats.stdev())
>
>
> Getting error , when I am running Numphy code.
> Please tell me what's wrong in this.
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-06 Thread Jay
I might have missed it but can you tell if the OOM is happening in driver
or executor ? Also it would be good if you can post the actual exception.

On Tue 5 Jun, 2018, 1:55 PM Nicolas Paris,  wrote:

> IMO your json cannot be read in parallell at all  then spark only offers
> you
> to play again with memory.
>
> I d'say at one step it has to feet in both one executor and in the driver.
> I d'try something like 20GB for both driver and executors and by using
> dynamic amount of executor in order to then repartition that fat json.
>
>
>
>
> 2018-06-05 22:40 GMT+02:00 raksja :
>
>> Yes I would say thats the first thing that i tried. thing is even though i
>> provide more num executor and more memory to each, this process gets OOM
>> in
>> only one task which is stuck and unfinished.
>>
>> I dont think its splitting the load to other tasks.
>>
>> I had 11 blocks on that file i stored in hdfs and i got 11 partitions in
>> my
>> dataframe, when i did show(1), it spinned up 11 tasks, 10 passed quickly 1
>> stuck and oom.
>>
>> Also i repartitioned to 1000 and that didnt help either.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


[SparkLauncher] stateChanged event not received in standalone cluster mode

2018-06-06 Thread Behroz Sikander
I have a client application which launches multiple jobs in Spark Cluster
using SparkLauncher. I am using *Standalone* *cluster mode*. Launching jobs
works fine till now. I use launcher.startApplication() to launch.

But now, I have a requirement to check the states of my Driver process. I
added a Listener implementing the SparkAppHandle.Listener but I don't get
any events. I am following the approach mentioned here
https://www.linkedin.com/pulse/spark-launcher-amol-kale

*I tried the same code with client code and I receive all the events as
expected*.

So, I am guessing that something different needs to be done in cluster
mode. Is there any example with cluster mode?

Regards,
Behroz


[Spark Streaming] Distinct Count on unrelated columns

2018-06-06 Thread Aakash Basu
Hi guys,

Posted a question (link)

on StackOverflow, any help?


Thanks,
Aakash.