Re: spark 2.2.1

2018-02-02 Thread Bill Schwanitz
What version of java?

On Feb 1, 2018 11:30 AM, "Mihai Iacob"  wrote:

> I am setting up a spark 2.2.1 cluster, however, when I bring up the master
> and workers (both on spark 2.2.1) I get this error. I tried spark 2.2.0 and
> get the same error. It works fine on spark 2.0.2. Have you seen this
> before, any idea what's wrong?
>
> I found this, but it's in a different situation: https://github.com/
> apache/spark/pull/19802
>
>
> 18/02/01 05:07:22 ERROR Utils: Exception encountered
>
> java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local
> class incompatible: stream classdesc serialVersionUID =
> -1223633663228316618, local class serialVersionUID = 1835832137613908542
>
> at java.io.ObjectStreamClass.initNonProxy(
> ObjectStreamClass.java:687)
>
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1885)
>
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1751)
>
> at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:1885)
>
> at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1751)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2042)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1573)
>
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2287)
>
> at java.io.ObjectInputStream.defaultReadObject(
> ObjectInputStream.java:563)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply$mcV$sp(WorkerInfo.scala:52)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply(WorkerInfo.scala:51)
>
> at org.apache.spark.deploy.master.WorkerInfo$$anonfun$
> readObject$1.apply(WorkerInfo.scala:51)
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303)
>
> at org.apache.spark.deploy.master.WorkerInfo.readObject(
> WorkerInfo.scala:51)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at java.io.ObjectStreamClass.invokeReadObject(
> ObjectStreamClass.java:1158)
>
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2178)
>
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2069)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1573)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:433)
>
> at org.apache.spark.serializer.JavaDeserializationStream.
> readObject(JavaSerializer.scala:75)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine.org
> $apache$spark$deploy$master$FileSystemPersistenceEngine$$
> deserializeFromFile(FileSystemPersistenceEngine.scala:80)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$
> anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine$$
> anonfun$read$1.apply(FileSystemPersistenceEngine.scala:56)
>
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(
> ArrayOps.scala:186)
>
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:234)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>
> at org.apache.spark.deploy.master.FileSystemPersistenceEngine.
> read(FileSystemPersistenceEngine.scala:56)
>
> at org.apache.spark.deploy.master.PersistenceEngine$$
> anonfun$readPersistedData$1.apply(PersistenceEngine.scala:87)
>
> at org.apache.spark.deploy.master.PersistenceEngine$$
> anonfun$readPersistedData$1.apply(PersistenceEngine.scala:86)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>
> at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(
> NettyRpcEnv.scala:316)
>
>packet_write_wait: Connection to 9.30.118.193 port 22: Broken
> pipeData(PersistenceEngine.scala:86)
> ​​​
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: spark kafka consumer with kerberos

2017-03-31 Thread Bill Schwanitz
Saisai,

Yea that seems to have helped. Looks like the kerberos ticket when I submit
does not get passed to the executor?

... 3 more
Caused by: org.apache.kafka.common.KafkaException:
javax.security.auth.login.LoginException: Unable to obtain password from
user

at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
... 14 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain
password from user


On Fri, Mar 31, 2017 at 9:08 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Hi Bill,
>
> The exception is from executor side. From the gist you provided, looks
> like the issue is that you only configured java options in driver side, I
> think you should also configure this in executor side. You could refer to
> here (https://github.com/hortonworks-spark/skc#running-
> on-a-kerberos-enabled-cluster).
>
> --files key.conf#key.conf,v.keytab#v.keytab
> --driver-java-options "-Djava.security.auth.login.config=./key.conf"
> --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./key.conf"
>
>
> On Fri, Mar 31, 2017 at 1:58 AM, Bill Schwanitz <bil...@bilsch.org> wrote:
>
>> I'm working on a poc spark job to pull data from a kafka topic with
>> kerberos enabled ( required ) brokers.
>>
>> The code seems to connect to kafka and enter a polling mode. When I toss
>> something onto the topic I get an exception which I just can't seem to
>> figure out. Any ideas?
>>
>> I have a full gist up at https://gist.github.com/bil
>> sch/17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
>> hdfs/spark client code for just normal operations everything works fine but
>> for some reason the streaming code is having issues. I have verified the
>> KafkaClient object is in the jaas config. The keytab is good etc.
>>
>> Guessing I'm doing something wrong I just have not figured out what yet!
>> Any thoughts?
>>
>> The exception:
>>
>> 17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
>> construct kafka consumer
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:702)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:557)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:540)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.> t>(CachedKafkaConsumer.scala:47)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get
>> (CachedKafkaConsumer.scala:157)
>> at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.(KafkaRDD.scala:210)
>> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRD
>> D.scala:185)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Jaas configuration not found
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:86)
>> at org.apache.kafka.common.network.ChannelBuilders.create(
>> ChannelBuilders.java:70)
>> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(Cl
>> ientUtils.java:83)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.(Kafka
>> Consumer.java:623)
>> ... 14 more
>> Caused by: org.apache.kafka.common.KafkaException: Jaas configuration
>> not found
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:299)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> configure(KerberosLogin.java:103)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> (LoginManager.java:45)
>> at org.apache.kafka.common.security.authenticator.LoginManager.
>> acquireLoginManager(LoginManager.java:68)
>> at org.apache.kafka.common.network.SaslChannelBuilder.configure
>> (SaslChannelBuilder.java:78)
>> ... 17 more
>> Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
>> this configuration.
>> at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUt
>> ils.java:50)
>> at org.apache.kafka.common.security.kerberos.KerberosLogin.
>> getServiceName(KerberosLogin.java:297)
>> ... 21 more
>>
>
>


spark 2 and kafka consumer with ssl/kerberos

2017-03-30 Thread Bill Schwanitz
I'm working on a poc spark job to pull data from a kafka topic with
kerberos enabled ( required ) brokers.

The code seems to connect to kafka and enter a polling mode. When I toss
something onto the topic I get an exception which I just can't seem to
figure out. Any ideas?

I have a full gist up at https://gist.github.com/bilsch/
17f4a4c4303ed3e004e2234a5904f0de with a lot of details. If I use the
hdfs/spark client code for just normal operations everything works fine but
for some reason the streaming code is having issues. I have verified the
KafkaClient object is in the jaas config. The keytab is good etc.

Guessing I'm doing something wrong I just have not figured out what yet!
Any thoughts?

The exception:

17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.<
init>(CachedKafkaConsumer.scala:47)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
get(CachedKafkaConsumer.scala:157)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
KafkaRDD.scala:210)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
Jaas configuration not found
at org.apache.kafka.common.network.SaslChannelBuilder.
configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.
create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(
ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.(
KafkaConsumer.java:623)
... 14 more
Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
found
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
KerberosLogin.java:299)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(
KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.(
LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.
acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.
configure(SaslChannelBuilder.java:78)
... 17 more
Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
this configuration.
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:50)
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(
KerberosLogin.java:297)
... 21 more


spark kafka consumer with kerberos

2017-03-30 Thread Bill Schwanitz
I'm working on a poc spark job to pull data from a kafka topic with
kerberos enabled ( required ) brokers.

The code seems to connect to kafka and enter a polling mode. When I toss
something onto the topic I get an exception which I just can't seem to
figure out. Any ideas?

I have a full gist up at
https://gist.github.com/bilsch/17f4a4c4303ed3e004e2234a5904f0de with a lot
of details. If I use the hdfs/spark client code for just normal operations
everything works fine but for some reason the streaming code is having
issues. I have verified the KafkaClient object is in the jaas config. The
keytab is good etc.

Guessing I'm doing something wrong I just have not figured out what yet!
Any thoughts?

The exception:

17/03/30 12:54:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
host5.some.org.net): org.apache.kafka.common.KafkaException: Failed to
construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.(CachedKafkaConsumer.scala:47)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:157)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Jaas configuration not found
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623)
... 14 more
Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not
found
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:299)
at
org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:45)
at
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 17 more
Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in
this configuration.
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:50)
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
... 21 more


Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-18 Thread Bill Schwanitz
I have had similar issues with some of my spark jobs especially doing 
things like repartitioning.


spark.yarn.driver.memoryOverhead	driverMemory * 0.10, with minimum of 
384	The amount of off-heap memory (in megabytes) to be allocated per 
driver in cluster mode. This is memory that accounts for things like VM 
overheads, interned strings, other native overheads, etc. This tends to 
grow with the container size (typically 6-10%).


I bumped the overhead memory as a way to work around the issue. Not sure 
if that is the best way but its how I got around it ;)


darin wrote:

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
 originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
   .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd =>  {
 if (!rdd.isEmpty()) {
   val batchTime = Calendar.getInstance.getTimeInMillis
   val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
   val nameList = rdd.map(_._2).reduce(_ ++ _).toList
   val jedis = RedisUtils.jedis()
   jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
   jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
   jedis.set(joinString("t_ad", batchTime.toString), "OK")
   jedis.close()

   rdd.flatMap(_._3).foreachPartition(logInfoList =>  {
 val producter = new StringProducter
 for (logInfo<- logInfoList) {
   val logInfoArr = logInfo.split("\t", -1)
   val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
   producter.send("cookedLog", kafkaKey, logInfo)
 }
 producter.close()
   })
 }
   })
```

These are jvm heap mat results





/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: question on transforms for spark 2.0 dataset

2017-03-01 Thread Bill Schwanitz
Subhash,

Yea that did the trick thanks!

On Wed, Mar 1, 2017 at 12:20 PM, Subhash Sriram <subhash.sri...@gmail.com>
wrote:

> If I am understanding your problem correctly, I think you can just create
> a new DataFrame that is a transformation of sample_data by first
> registering sample_data as a temp table.
>
> //Register temp table
> sample_data.createOrReplaceTempView("sql_sample_data")
>
> //Create new DataSet with transformed values
> val transformed = spark.sql("select trim(field1) as field1, trim(field2)
> as field2.. from sql_sample_data")
>
> //Test
> transformed.show(10)
>
> I hope that helps!
> Subhash
>
>
> On Wed, Mar 1, 2017 at 12:04 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hi I think u need an UDF if u want to transform a column
>> Hth
>>
>> On 1 Mar 2017 4:22 pm, "Bill Schwanitz" <bil...@bilsch.org> wrote:
>>
>>> Hi all,
>>>
>>> I'm fairly new to spark and scala so bear with me.
>>>
>>> I'm working with a dataset containing a set of column / fields. The data
>>> is stored in hdfs as parquet and is sourced from a postgres box so fields
>>> and values are reasonably well formed. We are in the process of trying out
>>> a switch from pentaho and various sql databases to pulling data into hdfs
>>> and applying transforms / new datasets with processing being done in spark
>>> ( and other tools - evaluation )
>>>
>>> A rough version of the code I'm running so far:
>>>
>>> val sample_data = spark.read.parquet("my_data_input")
>>>
>>> val example_row = spark.sql("select * from parquet.my_data_input where
>>> id = 123").head
>>>
>>> I want to apply a trim operation on a set of fields - lets call them
>>> field1, field2, field3 and field4.
>>>
>>> What is the best way to go about applying those trims and creating a new
>>> dataset? Can I apply the trip to all fields in a single map? or do I need
>>> to apply multiple map functions?
>>>
>>> When I try the map ( even with a single )
>>>
>>> scala> val transformed_data = sample_data.map(
>>>  |   _.trim(col("field1"))
>>>  |   .trim(col("field2"))
>>>  |   .trim(col("field3"))
>>>  |   .trim(col("field4"))
>>>  | )
>>>
>>> I end up with the following error:
>>>
>>> :26: error: value trim is not a member of
>>> org.apache.spark.sql.Row
>>>  _.trim(col("field1"))
>>>^
>>>
>>> Any ideas / guidance would be appreciated!
>>>
>>
>


question on transforms for spark 2.0 dataset

2017-03-01 Thread Bill Schwanitz
Hi all,

I'm fairly new to spark and scala so bear with me.

I'm working with a dataset containing a set of column / fields. The data is
stored in hdfs as parquet and is sourced from a postgres box so fields and
values are reasonably well formed. We are in the process of trying out a
switch from pentaho and various sql databases to pulling data into hdfs and
applying transforms / new datasets with processing being done in spark (
and other tools - evaluation )

A rough version of the code I'm running so far:

val sample_data = spark.read.parquet("my_data_input")

val example_row = spark.sql("select * from parquet.my_data_input where id =
123").head

I want to apply a trim operation on a set of fields - lets call them
field1, field2, field3 and field4.

What is the best way to go about applying those trims and creating a new
dataset? Can I apply the trip to all fields in a single map? or do I need
to apply multiple map functions?

When I try the map ( even with a single )

scala> val transformed_data = sample_data.map(
 |   _.trim(col("field1"))
 |   .trim(col("field2"))
 |   .trim(col("field3"))
 |   .trim(col("field4"))
 | )

I end up with the following error:

:26: error: value trim is not a member of org.apache.spark.sql.Row
 _.trim(col("field1"))
   ^

Any ideas / guidance would be appreciated!