Re: [SPARK-SQL] How to return GenericInternalRow from spark udf

2020-08-06 Thread Sean Owen
The UDF should return the result value you want, not a whole Row. In
Scala it figures out the schema of the UDF's result from the
signature.

On Thu, Aug 6, 2020 at 7:56 AM Amit Joshi  wrote:
>
> Hi,
>
> I have a spark udf written in scala that takes couuple of columns and apply 
> some logic and output InternalRow. There is spark schema of StructType also 
> present. But when I try to return the InternalRow from UDF there is exception
>
> java.lang.UnsupportedOperationException: Schema for type 
> org.apache.spark.sql.catalyst.GenericInternalRow is not supported
>
>   val getData = (hash : String, type : String) => {
> val schema = hash match {
>   case "people" =>
> peopleSchema
>   case "empl" =>  emplSchema
> }
> getGenericInternalRow(schema)
>   }
>
>   val data = udf(getData)
>
> Spark Version : 2.4.5
>
>
> Please Help.
>
>
> Regards
>
> Amit Joshi

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[SPARK-SQL] How to return GenericInternalRow from spark udf

2020-08-06 Thread Amit Joshi
Hi,



I have a spark udf written in scala that takes couuple of columns and apply
some logic and output InternalRow. There is spark schema of StructType also
present. But when I try to return the InternalRow from UDF there is
exception

java.lang.UnsupportedOperationException: Schema for type
org.apache.spark.sql.catalyst.GenericInternalRow is not supported

  val getData = (hash : String, type : String) => {
val schema = hash match {
  case "people" =>
peopleSchema
  case "empl" =>  emplSchema
}
getGenericInternalRow(schema)
  }

  val data = udf(getData)

Spark Version : 2.4.5


Please Help.


Regards

Amit Joshi


join doesn't work

2020-08-06 Thread nt
I've using streamline pulsar connector, each dataset receives the data
properly but cannot make join to be working

Dataset datasetPolicyWithWtm =
datasetPolicy.withWatermark("__publishTime", "5 minutes").as("pol");
Dataset datasetPhoneWithWtm =
datasetPhone.withWatermark("__publishTime", "5 minutes").as("ph");

Dataset join = datasetPolicyWithWtm.join(
datasetPhoneWithWtm,
functions.expr("pol.__key=ph.__key and ph.__publishTime >=
pol.__publishTime - interval 2 minutes and ph.__publishTime <=
pol.__publishTime + interval 2 minutes"),
"inner")
   //
   
.groupBy(functions.window(datasetPolicyWithWtm.col("__publishTime"), "2
minutes"), functions.col("pol.__key"))
.count();
Not sure what could be the reason



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: S3 read/write from PySpark

2020-08-06 Thread Daniel Stojanov
Hi,

Thanks for your help. Problem solved, but I thought I should add something
in case this problem is encountered by others.

Both responses are correct; BasicAWSCredentialsProvider is gone, but simply
making the substitution leads to the traceback just
below. java.lang.NoSuchMethodError: 'void
com.google.common.base.Preconditions.checkArgument(boolean,
java.lang.String, java.lang.Object, java.lang.Object)'
I found that changing all of the Hadoop packages from 3.3.0 to 3.2.0 *and*
changing the options away from BasicAWSCredentialsProvider solved the
problem.

Thanks.
Regards,



Traceback (most recent call last):
  File "", line 1, in 
  File
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
line 535, in csv
return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py",
line 131, in deco
return f(*a, **kw)
  File
"/home/daniel/packages/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
: java.lang.NoSuchMethodError: 'void
com.google.common.base.Preconditions.checkArgument(boolean,
java.lang.String, java.lang.Object, java.lang.Object)'
at
org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:893)
at
org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:869)
at
org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm(S3AUtils.java:1580)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:341)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at
org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at
org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)








On Thu, 6 Aug 2020 at 17:19, Stephen Coy  wrote:

> Hi Daniel,
>
> It looks like …BasicAWSCredentialsProvider has
> become org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.
>
> However, the way that the username and password are provided appears to
> have changed so you will probably need to look in to that.
>
> Cheers,
>
> Steve C
>
> On 6 Aug 2020, at 11:15 am, Daniel Stojanov 
> wrote:
>
> Hi,
>
> I am trying to read/write files to S3 from PySpark. The procedure that I
> have used is to download Spark, start PySpark with the hadoop-aws, guava,
> aws-java-sdk-bundle packages. The versions are explicitly specified by
> looking up the exact dependency version on Maven. Allowing dependencies to
> be auto determined does not work. This procedure works for Spark 3.0.0 with
> Hadoop 2.7, but does not work for Hadoop 3.2. I get this exception:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider
>
>
> Can somebody point to a procedure for doing this using Spark bundled with
> Hadoop 3.2?
>
> Regards,
>
>
>
>
>
> To replicate:
>
> Download Spark 3.0.0 with support for Hadoop 3.2.
>
> Launch spark with:
>
> ./pyspark --packages
> 

Re: S3 read/write from PySpark

2020-08-06 Thread Stephen Coy
Hi Daniel,

It looks like …BasicAWSCredentialsProvider has become 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.

However, the way that the username and password are provided appears to have 
changed so you will probably need to look in to that.

Cheers,

Steve C

On 6 Aug 2020, at 11:15 am, Daniel Stojanov 
mailto:m...@danielstojanov.com>> wrote:

Hi,

I am trying to read/write files to S3 from PySpark. The procedure that I have 
used is to download Spark, start PySpark with the hadoop-aws, guava, 
aws-java-sdk-bundle packages. The versions are explicitly specified by looking 
up the exact dependency version on Maven. Allowing dependencies to be auto 
determined does not work. This procedure works for Spark 3.0.0 with Hadoop 2.7, 
but does not work for Hadoop 3.2. I get this exception:
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider


Can somebody point to a procedure for doing this using Spark bundled with 
Hadoop 3.2?

Regards,





To replicate:

Download Spark 3.0.0 with support for Hadoop 3.2.

Launch spark with:

./pyspark --packages 
org.apache.hadoop:hadoop-common:3.3.0,org.apache.hadoop:hadoop-client:3.3.0,org.apache.hadoop:hadoop-aws:3.3.0,com.amazonaws:aws-java-sdk-bundle:1.11.563,com.google.guava:guava:27.1-jre

Then run the following Python.

# Set these 4 parameters as appropriate.
aws_bucket = ""
aws_filename = ""
aws_access_key = ""
aws_secret_key = ""

spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", 
"true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", 
"s3.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
df = spark.read.option("header", 
"true").csv(f"s3a://{aws_bucket}/{aws_filename}")



Leads to this error message:



Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
 line 535, in csv
return 
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File 
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1305, in __call__
  File 
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py",
 line 131, in deco
return f(*a, **kw)
  File 
"/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
: java.io.IOException: From option fs.s3a.aws.credentials.provider 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider not found
at 
org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:645)
at 
org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:668)
at 
org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:619)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:636)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:390)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 

Re: S3 read/write from PySpark

2020-08-06 Thread German Schiavon
Hey,

I think *BasicAWSCredentialsProvider *is no longer supported by hadoop. I
couldn't find it the master branch but I could in 2.8 branch.
Maybe that's why with Hadoop 2.7 works.

I use *TemporaryAWSCredentialsProvider.*

Hope it helps

On Thu, 6 Aug 2020 at 03:16, Daniel Stojanov 
wrote:

> Hi,
>
> I am trying to read/write files to S3 from PySpark. The procedure that I
> have used is to download Spark, start PySpark with the hadoop-aws, guava,
> aws-java-sdk-bundle packages. The versions are explicitly specified by
> looking up the exact dependency version on Maven. Allowing dependencies to
> be auto determined does not work. This procedure works for Spark 3.0.0 with
> Hadoop 2.7, but does not work for Hadoop 3.2. I get this exception:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider
>
>
> Can somebody point to a procedure for doing this using Spark bundled with
> Hadoop 3.2?
>
> Regards,
>
>
>
>
>
> To replicate:
>
> Download Spark 3.0.0 with support for Hadoop 3.2.
>
> Launch spark with:
>
> ./pyspark --packages
> org.apache.hadoop:hadoop-common:3.3.0,org.apache.hadoop:hadoop-client:3.3.0,org.apache.hadoop:hadoop-aws:3.3.0,com.amazonaws:aws-java-sdk-bundle:1.11.563,com.google.guava:guava:27.1-jre
>
> Then run the following Python.
>
>
> # Set these 4 parameters as appropriate.
>
> aws_bucket = ""
>
> aws_filename = ""
>
> aws_access_key = ""
>
> aws_secret_key = ""
>
>
>
> spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
>
> spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4",
> "true")
>
>
> spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
>
> spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com
> ")
>
> spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
>
> spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
>
> df = spark.read.option("header",
> "true").csv(f"s3a://{aws_bucket}/{aws_filename}")
>
>
>
>
> Leads to this error message:
>
>
>
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/readwriter.py",
> line 535, in csv
> return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>   File
> "/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1305, in __call__
>   File
> "/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/pyspark/sql/utils.py",
> line 131, in deco
> return f(*a, **kw)
>   File
> "/home/daniel/.local/share/Trash/files/spark-3.3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o39.csv.
> : java.io.IOException: From option fs.s3a.aws.credentials.provider
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider not found
> at
> org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:645)
> at
> org.apache.hadoop.fs.s3a.S3AUtils.buildAWSProviderList(S3AUtils.java:668)
> at
> org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:619)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.bindAWSClient(S3AFileSystem.java:636)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:390)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
> at scala.Option.getOrElse(Option.scala:189)
> at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
> at
> org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:705)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>