Re: Create static Map Type column

2017-07-26 Thread ayan guha
never mind, found the solution.

Spark 2.0 +

val df1 = df.withColumn("newcol",map(lit("field1"),lit("fieldName1")))

scala> df1.show()
+---++
|  a|  newcol|
+---++
|  1|Map(field1 -> fie...|
|  2|Map(field1 -> fie...|
|  3|Map(field1 -> fie...|
|  4|Map(field1 -> fie...|
+---++

Thanks for creating such useful functions, spark devs :)

Best
Ayan

On Thu, Jul 27, 2017 at 2:30 PM, ayan guha  wrote:

> Hi
>
> I want to create a static Map Type column to a dataframe.
>
> How I am doing now:
>
>   val fieldList = spark.sparkContext.parallelize(Array(Row(Map("field1"
> -> "someField"
>
>   val fieldListSchemaBase = new StructType()
>
>   val f = StructField("encrypted_field_list",MapType(StringType,
> StringType))
>
>   val fieldListSchema = fieldListSchemaBase.add(f)
>
>   val fieldListDF = spark.createDataFrame(fieldList,fieldListSchema)
>
>
> val saveFinalWithFieldList = saveFinal.join(fieldListDF)
>
>
> But it requires me to switch on cartesian join (which this join is). Is
> there any other simpler way to achieve this? Probably using withColumn API?
>
> I saw a similar post here
> but
> not able to use the trick Jacek suggested.
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


running spark application compiled with 1.6 on spark 2.1 cluster

2017-07-26 Thread satishl
My Spark application is compiled with 1.6 spark core and dependencies. 
When I try to run this app on a spark 2.1 cluster - I run into 
*ERROR ApplicationMaster: User class threw exception:
java.lang.NoClassDefFoundError: org/apache/spark/Logging
*

I was hoping that 2.+ spark is backward compatible and I wouldnt need to
recompile my application.
is this a supported scenario - i.e., can I run app compiled with spark 1.6
on a 2.+ spark cluster?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/running-spark-application-compiled-with-1-6-on-spark-2-1-cluster-tp29002.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Please unsubscribe

2017-07-26 Thread sowmya ramesh



Create static Map Type column

2017-07-26 Thread ayan guha
Hi

I want to create a static Map Type column to a dataframe.

How I am doing now:

  val fieldList = spark.sparkContext.parallelize(Array(Row(Map("field1"
-> "someField"

  val fieldListSchemaBase = new StructType()

  val f =
StructField("encrypted_field_list",MapType(StringType,StringType))

  val fieldListSchema = fieldListSchemaBase.add(f)

  val fieldListDF = spark.createDataFrame(fieldList,fieldListSchema)


val saveFinalWithFieldList = saveFinal.join(fieldListDF)


But it requires me to switch on cartesian join (which this join is). Is
there any other simpler way to achieve this? Probably using withColumn API?

I saw a similar post here
but
not able to use the trick Jacek suggested.
-- 
Best Regards,
Ayan Guha


Can i move TFS and TSFT out of spark package

2017-07-26 Thread Jone Zhang
I have build the spark-assembly-1.6.0-hadoop2.5.1.jar

cat spark-assembly-1.6.0-hadoop2.5.1.jar/META-INF/services/org.
apache.hadoop.fs.FileSystem
...
org.apache.hadoop.hdfs.DistributedFileSystem
org.apache.hadoop.hdfs.web.HftpFileSystem
org.apache.hadoop.hdfs.web.HsftpFileSystem
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
tachyon.hadoop.TFS
tachyon.hadoop.TFSFT

Can i move TFS and TSFT out of spark-assembly-1.6.0-hadoop2.5.1.jar
How do I modify it before build


Thanks.


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread ayan guha
Hi TD

I thought structured streaming does provide similar concept of dataframes
where it does not matter which language I use to invoke the APIs, with
exception of udf.

So, when I think of support foreach sink in python, I think it as just a
wrapper api and data should remain in JVM only. Similar to, for example, a
hive writer or hdfs writer in Dataframe API.

Am I too simplifying? Or is it just early days in structured streaming?
Happy to learn any mistakes in my thinking and understanding.

Best
Ayan

On Thu, 27 Jul 2017 at 4:49 am, Priyank Shrivastava 
wrote:

> Thanks TD.  I am going to try the python-scala hybrid approach by using
> scala only for custom redis sink and python for the rest of the app .  I
> understand it might not be as efficient as purely writing the app in scala
> but unfortunately I am constrained on scala resources.  Have you come
> across other use cases where people have resided to such python-scala
> hybrid approach?
>
> Regards,
> Priyank
>
>
>
> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hello Priyank
>>
>> Writing something purely in Scale/Java would be the most efficient. Even
>> if we expose python APIs that allow writing custom sinks in pure Python, it
>> wont be as efficient as Scala/Java foreach as the data would have to go
>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>> foreach is always going to be the best option.
>>
>> TD
>>
>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I am trying to write key-values to redis using a DataStreamWriter object
>>> using pyspark structured streaming APIs. I am using Spark 2.2
>>>
>>> Since the Foreach Sink is not supported for python; here
>>> ,
>>> I am trying to find out some alternatives.
>>>
>>> One alternative is to write a separate Scala module only to push data
>>> into redis using foreach; ForeachWriter
>>> 
>>>  is
>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>> adds deployment overhead because now I will have to support Scala in my app.
>>>
>>> Another approach is obviously to use Scala instead of python, which is
>>> fine but I want to make sure that I absolutely cannot use python for this
>>> problem before I take this path.
>>>
>>> Would appreciate some feedback and alternative design approaches for
>>> this problem.
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>
> --
Best Regards,
Ayan Guha


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
We see that all the time. For example, in SQL, people can write their
user-defined function in Scala/Java and use it from SQL/python/anywhere.
That is the recommended way to get the best combo of performance and
ease-of-use from non-jvm languages.

On Wed, Jul 26, 2017 at 11:49 AM, Priyank Shrivastava <
priy...@asperasoft.com> wrote:

> Thanks TD.  I am going to try the python-scala hybrid approach by using
> scala only for custom redis sink and python for the rest of the app .  I
> understand it might not be as efficient as purely writing the app in scala
> but unfortunately I am constrained on scala resources.  Have you come
> across other use cases where people have resided to such python-scala
> hybrid approach?
>
> Regards,
> Priyank
>
>
>
> On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hello Priyank
>>
>> Writing something purely in Scale/Java would be the most efficient. Even
>> if we expose python APIs that allow writing custom sinks in pure Python, it
>> wont be as efficient as Scala/Java foreach as the data would have to go
>> through JVM / PVM boundary which has significant overheads. So Scala/Java
>> foreach is always going to be the best option.
>>
>> TD
>>
>> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
>> priy...@asperasoft.com> wrote:
>>
>>> I am trying to write key-values to redis using a DataStreamWriter object
>>> using pyspark structured streaming APIs. I am using Spark 2.2
>>>
>>> Since the Foreach Sink is not supported for python; here
>>> ,
>>> I am trying to find out some alternatives.
>>>
>>> One alternative is to write a separate Scala module only to push data
>>> into redis using foreach; ForeachWriter
>>> 
>>>  is
>>> supported in Scala. BUT this doesn't seem like an efficient approach and
>>> adds deployment overhead because now I will have to support Scala in my app.
>>>
>>> Another approach is obviously to use Scala instead of python, which is
>>> fine but I want to make sure that I absolutely cannot use python for this
>>> problem before I take this path.
>>>
>>> Would appreciate some feedback and alternative design approaches for
>>> this problem.
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Priyank Shrivastava
Thanks TD.  I am going to try the python-scala hybrid approach by using
scala only for custom redis sink and python for the rest of the app .  I
understand it might not be as efficient as purely writing the app in scala
but unfortunately I am constrained on scala resources.  Have you come
across other use cases where people have resided to such python-scala
hybrid approach?

Regards,
Priyank



On Wed, Jul 26, 2017 at 1:46 AM, Tathagata Das 
wrote:

> Hello Priyank
>
> Writing something purely in Scale/Java would be the most efficient. Even
> if we expose python APIs that allow writing custom sinks in pure Python, it
> wont be as efficient as Scala/Java foreach as the data would have to go
> through JVM / PVM boundary which has significant overheads. So Scala/Java
> foreach is always going to be the best option.
>
> TD
>
> On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava <
> priy...@asperasoft.com> wrote:
>
>> I am trying to write key-values to redis using a DataStreamWriter object
>> using pyspark structured streaming APIs. I am using Spark 2.2
>>
>> Since the Foreach Sink is not supported for python; here
>> ,
>> I am trying to find out some alternatives.
>>
>> One alternative is to write a separate Scala module only to push data
>> into redis using foreach; ForeachWriter
>> 
>>  is
>> supported in Scala. BUT this doesn't seem like an efficient approach and
>> adds deployment overhead because now I will have to support Scala in my app.
>>
>> Another approach is obviously to use Scala instead of python, which is
>> fine but I want to make sure that I absolutely cannot use python for this
>> problem before I take this path.
>>
>> Would appreciate some feedback and alternative design approaches for this
>> problem.
>>
>> Thanks.
>>
>>
>>
>>
>


DStream Spark 2.1.1 Streaming on EMR at scale - long running job fails after two hours

2017-07-26 Thread Mikhailau, Alex
Guys,

I am trying hard to make a DStream API Spark streaming job work on EMR. I’ve 
succeeded to the point of running it for a few hours with eventual failure 
which is when I start seeing some out of memory exception via “yarn logs” 
aggregate.

I am doing a JSON map and extraction of some fields via play-json in the map 
portion (mappedEvents)

val mappedEvents:DStream[(String, Iterable[Array[Byte]])] = {map json events 
keyed off of `user-id` }

val stateFulRDD = mappedEvents
.reduceByKeyAndWindow( (x: Iterable[Array[Byte]], y:Iterable[Array[Byte]]) => {
  val x1 = x.map(arrayToUserSession)
  val y1 = y.map(arrayToUserSession)
  val z = x1 ++ y1
  val now = System.currentTimeMillis()
  z.groupBy(_.psid).map(_._2.maxBy(_.lastTime))
.filter(l =>  l.lastTime + eventValiditySeconds*1000 >= now)
  .map(userSessionToArray)
  }, windowSize, slidingInterval)
  .filter(_._2.size>1)
.mapWithState(stateSpec)

//doing sessionization where I keep last timestamp as the beginning of session 
via mapWithState for any session counts > 1 to make it use state API less 
frequently.

val stateSpec = StateSpec.function(updateUserEvents 
_).timeout(windowSize.times(1).plus(batchInterval))

def updateUserEvents(key: String,
 newValue: 
Option[scala.collection.immutable.Iterable[Array[Byte]]],
 state: State[UserSessions]): Option[UserSessions]


My window is 60 seconds and my slidingInterval is 10 seconds with batchInterval 
of 20 seconds

In my load test of 250K records per second (each record is around 1.6KB) in 
Kinesis Stream running on EMR 5.7.0 cluster on yarn with 25 core nodes of 
m4.2xlarge and a master of m4.4xlarge with plenty of EBS Storage sc1 attached 
(10TB), I cannot sustain load for longer than 2 hours. The cluster errors out.

This is my submit job parameters,

aws emr add-steps --cluster-id $CLUSTER_ID --steps 
Name=SessionCount,Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/usr/lib/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn,--conf,spark.streaming.stopGracefullyOnShutdown=true,--conf,spark.locality.wait=7500ms,--conf,spark.streaming.blockInterval=1ms,--conf,spark.shuffle.consolidateFiles=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.closure.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.dynamicAllocation.enabled=true,--conf,spark.scheduler.mode=FIFO,--conf,spark.ui.retainedJobs=50,--conf,spark.ui.retainedStages=50,--conf,spark.ui.retainedTasks=500,--conf,spark.worker.ui.retainedExecutors=50,--conf,spark.worker.ui.retainedDrivers=50,--conf,spark.sql.ui.retainedExecutions=50,--conf,spark.streaming.ui.retainedBatches=50,--conf,spark.rdd.compress=false,--conf,spark.yarn.executor.memoryOverhead=5120,--executor-memory,15G,--class,SessionCountX
  - and job parameters follow

with env.json
{
{
  "Classification": "yarn-site",
  "Properties": {
"yarn.log-aggregation-enable": "true",
"yarn.log-aggregation.retain-seconds": "-1",
"yarn.nodemanager.remote-app-log-dir": "s3:\/\/my-bucket-logs",
"yarn.nodemanager.vmem.check.enabled": "false"
  }
},
{
  "Classification": "spark",
  "Properties": {
"maximizeResourceAllocation": "true"
  }
}
}

Also, looking at the executors page of Spark UI, I see Input continuing to grow 
with time. I am not sure if the fact that user-id is UUID.random() in the load 
test is the cause of that and if I should load test with finite set of 
user-id’s for limited key-space in Spark but that is something I noticed. 
Shuffle read/write size normalizes eventually though and stays about the same.


The following exceptions are seen from a failed job:

17/07/26 07:13:51 WARN NettyRpcEndpointRef: Error sending message [message = 
Heartbeat(28,[Lscala.Tuple2;@2147b84f,BlockManagerId(28, 
ip-10-202-138-81.mlbam.qa.us-east-1.bamgrid.net, 38630, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. 
This timeout is controlled by spark.executor.heartbeatInterval


17/07/26 07:13:51 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Thread-9,5,main]
java.lang.OutOfMemoryError: Java heap space
at 
io.netty.buffer.UnpooledHeapByteBuf.(UnpooledHeapByteBuf.java:45)
at 
io.netty.buffer.UnpooledUnsafeHeapByteBuf.(UnpooledUnsafeHeapByteBuf.java:29)
at 
io.netty.buffer.UnpooledByteBufAllocator.newHeapBuffer(UnpooledByteBufAllocator.java:59)
at 
io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:158)
at 
io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:149)
at io.netty.buffer.Unpooled.buffer(Unpooled.java:116)
at 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.toByteBuffer(BlockTransferMessage.java:78)
at 
org.apache.spark.network.netty.

[Spark streaming-Mesos-cluster mode] java.lang.RuntimeException: Stream jar not found

2017-07-26 Thread RCinna
Hello,

I have a spark streaming job using hdfs and checkpointing components and
running well on a standalone spark cluster with multi nodes, both in client
and cluster deploy mode.
I would like to switch with Mesos cluster manager and submit job as cluster
deploy mode.

First launch of the app is working well wheareas second launch (after kill)
implying checkpoint recovery failed as : 
___
java.lang.RuntimeException: Stream '/jars/application.jar' was not found.
at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:222)
...
___
This error occurs because the Driver that is in charge of exposing
application jar to the executors, is trying to expose it from the jar path
stored by the checkpoint (loaded from hdfs and stored in mesos workdir path
= sandbox) that does not exist in the current node.

I'm confused by the dispatcher beehaviour. It's seems that there are
functional gaps between checkpoint retrieving in spark streaming and the
sandbox machinerie used by mesos cluster.

*
1. Why spark is using a rpc interface to expose application jar to executors
when using hdfs, instead of executors are loading directly from source ?
2. How to fix this issue (if possible ?) *


Versions : Mesos 1.2.0 spark 2.0.1 hdfs 2.7
More information, see  stackoverflow issue here

 
.

Thanks,
RCinna



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-Mesos-cluster-mode-java-lang-RuntimeException-Stream-jar-not-found-tp29001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: some Ideas on expressing Spark SQL using JSON

2017-07-26 Thread Sathish Kumaran Vairavelu
Agreed. For the same reason dataframes / dataset which is another DSL used
in Spark
On Wed, Jul 26, 2017 at 1:00 AM Georg Heiler 
wrote:

> Because sparks dsl partially supports compile time type safety. E.g. the
> compiler will notify you that a sql function was misspelled when using the
> dsl opposed to the plain sql string which is only parsed at runtime.
> Sathish Kumaran Vairavelu  schrieb am Di. 25.
> Juli 2017 um 23:42:
>
>> Just a thought. SQL itself is a DSL. Why DSL on top of another DSL?
>> On Tue, Jul 25, 2017 at 4:47 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I am thinking to express Spark SQL using JSON in the following the way.
>>>
>>> For Example:
>>>
>>> *Query using Spark DSL*
>>>
>>> DS.filter(col("name").equalTo("john"))
>>> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
>>> hours"), df1.col("hourlyPay"))
>>> .agg(sum("hourlyPay").as("total"));
>>>
>>>
>>> *Query using JSON*
>>>
>>>
>>>
>>> ​
>>> ​
>>> The Goal is to design a DSL in JSON such that users can and express
>>> SPARK SQL queries in JSON so users can send Spark SQL queries over rest and
>>> get the results out. Now, I am sure there are BI tools and notebooks like
>>> Zeppelin that can accomplish the desired behavior however I believe there
>>> maybe group of users who don't want to use those BI tools or notebooks
>>> instead they want all the communication from front end to back end using
>>> API's.
>>>
>>> Also another goal would be the DSL design in JSON should closely mimic
>>> the underlying Spark SQL DSL.
>>>
>>> Please feel free to provide some feedback or criticize to whatever
>>> extent you like!
>>>
>>> Thanks!
>>>
>>>
>>>


[Spark SQL] [pyspark.sql]: Potential bug in toDF using nested structures

2017-07-26 Thread msachdev
Hi,

I am trying to create a DF from a Python dictionary and encountered an
issue where some of the nested fields are being returned as None (on
collect). I have created a sample here with the output:
https://gist.github.com/sachdevm/04c27ec91adbe2fdbe5969f4af723642.

The sample contains two snippets -- one which exhibits the stated issue and
another which works correctly. My suspicion is that when parsing nested
dictionary objects in the Row class, the datatype for all values is being
incorrectly set to that of the first key encountered (in the above example
"duration") and when the conversion fails, it is being set as None. In the
second example in the gist, all values in the nested dictionary are strings
and all data is preserved correctly.

I am using version 2.1.0:

> >>> print pyspark.__version__
> 2.1.0



Please let me know if I am missing something or there some issue in the
code sample itself.

Thanks,
Manish




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-pyspark-sql-Potential-bug-in-toDF-using-nested-structures-tp29000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Need some help around a Spark Error

2017-07-26 Thread Alonso Isidoro Roman
I hope that helps

https://stackoverflow.com/questions/40623957/slave-lost-and-very-slow-join-in-spark


Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-07-26 3:13 GMT+02:00 Debabrata Ghosh :

> Hi,
>   While executing a SparkSQL query, I am hitting the
> following error. Wonder, if you can please help me with a possible cause
> and resolution. Here is the stacktrace for the same:
>
> 07/25/2017 02:41:58 PM - DataPrep.py 323 - __main__ - ERROR - An error
> occurred while calling o49.sql.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 12.2 failed 4 times, most recent failure: Lost task 0.3 in stage
> 12.2 (TID 2242, bicservices.hdp.com): ExecutorLostFailure (executor 25
> exited caused by one of the running tasks) Reason: Slave lost
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1433)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1421)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1420)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1420)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
>
> at scala.Option.foreach(Option.scala:236)
>
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:801)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1642)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1601)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1590)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1946)
>
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(
> InsertIntoHiveTable.scala:84)
>
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.
> sideEffectResult$lzycompute(InsertIntoHiveTable.scala:201)
>
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.
> sideEffectResult(InsertIntoHiveTable.scala:127)
>
> at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.
> doExecute(InsertIntoHiveTable.scala:276)
>
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$5.apply(SparkPlan.scala:132)
>
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> execute$5.apply(SparkPlan.scala:130)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:150)
>
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
>
> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
> QueryExecution.scala:55)
>
> at org.apache.spark.sql.execution.QueryExecution.
> toRdd(QueryExecution.scala:55)
>
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:145)
>
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
>
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
>
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>
> at py4j.Gateway.invoke(Gateway.java:259)
>
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:209)
> at java.lang.Thread.run(Thread.java:745)
>
> Cheers,
>
> Debu
>


Re: [SPARK STRUCTURED STREAMING]: Alternatives to using Foreach sink in pyspark

2017-07-26 Thread Tathagata Das
Hello Priyank

Writing something purely in Scale/Java would be the most efficient. Even if
we expose python APIs that allow writing custom sinks in pure Python, it
wont be as efficient as Scala/Java foreach as the data would have to go
through JVM / PVM boundary which has significant overheads. So Scala/Java
foreach is always going to be the best option.

TD

On Tue, Jul 25, 2017 at 6:05 PM, Priyank Shrivastava  wrote:

> I am trying to write key-values to redis using a DataStreamWriter object
> using pyspark structured streaming APIs. I am using Spark 2.2
>
> Since the Foreach Sink is not supported for python; here
> ,
> I am trying to find out some alternatives.
>
> One alternative is to write a separate Scala module only to push data into
> redis using foreach; ForeachWriter
> 
>  is
> supported in Scala. BUT this doesn't seem like an efficient approach and
> adds deployment overhead because now I will have to support Scala in my app.
>
> Another approach is obviously to use Scala instead of python, which is
> fine but I want to make sure that I absolutely cannot use python for this
> problem before I take this path.
>
> Would appreciate some feedback and alternative design approaches for this
> problem.
>
> Thanks.
>
>
>
>