Need Unit test complete reference for Pyspark

2020-11-18 Thread Sachit Murarka
Hi Users,

I have to write Unit Test cases for PySpark.
I think pytest-spark and "spark testing base" are good test libraries.

Can anyone please provide full reference for writing the test cases in
Python using these?

Kind Regards,
Sachit Murarka


Spark SQL check timestamp with other table and update a column.

2020-11-18 Thread anbutech
Hi Team,

i want to update a col3 in table 1 if col1 from table2 is less than col1 in
table1 and update each record in table 1.I 'am not getting the correct
output.

Table 1:
col1|col2|col3
2020-11-17T20:50:57.777+|1|null

Table 2:
col1|col2|col3
2020-11-17T21:19:06.508+|1|win
2020-11-17T20:49:06.244+|1|win
2020-11-17T20:19:13.484+|1|Win

sql tried:

select a.col1,a.col2.coalesce(a.col3,b.col3) as col3 
from table1 a left table2 b
on (a.col2=b.col2) and (b.col1 < b.col1)

output:
I getting the following output.

2020-11-17T20:50:57.777+|1|Win2020-11-17T21:19:06.508+|1|win
2020-11-17T20:50:57.777+|1|Win2020-11-17T20:49:06.244+|1|win

i'm looking for only the second record in the output.

the issue here is i'm getting additional one records if col1 from table2 is
less than col1 in table1 when i'm using the above query.

expected output:

2020-11-17T20:50:57.777+|1|Win 2020-11-17T20:49:06.244+|1|win

how do we achieve that correctly.I have many records like this.

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cannot perform operation after producer has been closed

2020-11-18 Thread Eric Beabes
I must say.. *Spark has let me down in this case*. I am surprised an
important issue like this hasn't been fixed in Spark 2.4.

I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at work &
now because Spark 2.4 can't handle this *I've been asked to rewrite the
code in Flink*.

Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't have
a Spark 3.0 parcel So we can't upgrade to 3.0.

So sad. Let me ask one more time. *Is there no way to fix this in Spark
2.4?*


On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes 
wrote:

> BTW, we are seeing this message as well: 
> *"org.apache.kafka.common.KafkaException:
> Producer** closed while send in progress"*. I am assuming this happens
> because of the previous issue.."producer has been closed", right? Or are
> they unrelated? Please advise. Thanks.
>
> On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes 
> wrote:
>
>> Thanks for the reply. We are on Spark 2.4. Is there no way to get this
>> fixed in Spark 2.4?
>>
>> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim 
>> wrote:
>>
>>> Which Spark version do you use? There's a known issue on Kafka producer
>>> pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check
>>> whether your case is bound to the known issue or not.
>>>
>>> https://issues.apache.org/jira/browse/SPARK-21869
>>>
>>>
>>> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
>>> wrote:
>>>
 I know this is related to Kafka but it happens during the Spark
 Structured Streaming job that's why I am asking on this mailing list.

 How would you debug this or get around this in Spark Structured
 Streaming? Any tips would be appreciated. Thanks.


 java.lang.IllegalStateException: Cannot perform operation after
 producer has been closed at
 org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
 at
 org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
 at
 org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
 at
 org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
 at
 org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)

>>>


Spark Exception

2020-11-18 Thread Amit Sharma
Hi, we are running a spark streaming  job and sometimes it throws below two
exceptions . I am not understanding  what is the difference between these
two exception for one timeout is 120 seconds and another is 600 seconds.
What could be the reason for these


 Error running job streaming job 1605709968000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
timed out after [120 seconds]. This timeout is controlled by
spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
at
org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)





2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
heartbeat-receiver-event-loop-thread
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
seconds]. This timeout is controlled by BlockManagerHeartbeat
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at
org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
at
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
at
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)