Process each kafka record for structured streaming

2021-01-20 Thread rajat kumar
Hi,

I want to apply custom logic for each row of data I am getting through
kafka and want to do it with microbatch.
When I am running it , it is not progressing.


kafka_stream_df \
.writeStream \
.foreach(process_records) \
.outputMode("append") \
.option("checkpointLocation", "checkpt") \
.trigger(continuous="5 seconds").start()

Regards

Rajat


Re: Issue with executer

2021-01-20 Thread Sachit Murarka
Hi Vikas

1. Are you running in local mode? Master has local[*]
2. Pls mask the ip or confidential info while sharing logs

Thanks
Sachit

On Wed, 20 Jan 2021, 17:35 Vikas Garg,  wrote:

> Hi,
>
> I am facing issue with spark executor. I am struggling with this issue
> since last many days and unable to resolve the issue.
>
> Below is the configuration I have given.
>
>   val spark = SparkSession.builder()
> .appName("Spark Job")
> .master("local[*]")
> .config("spark.dynamicAllocation.enabled", true)
> .config("spark.shuffle.service.enabled", true)
> .config("spark.driver.maxResultSize", "8g")
> .config("spark.driver.memory", "8g")
> .config("spark.executor.memory", "8g")
> .config("spark.network.timeout", "3600s")
> .getOrCreate()
>
> 1/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
>
> *java.io.IOException*: Failed to connect to
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:253*)
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:195*)
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
> *NettyBlockTransferService.scala:122*)
>
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
> *RetryingBlockFetcher.java:141*)
>
> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
> *RetryingBlockFetcher.java:121*)
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
> *NettyBlockTransferService.scala:143*)
>
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(
> *BlockTransferService.scala:103*)
>
> at
> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
> *BlockManager.scala:1010*)
>
> at
> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
> *BlockManager.scala:954*)
>
> at scala.Option.orElse(*Option.scala:289*)
>
> at org.apache.spark.storage.BlockManager.getRemoteBlock(
> *BlockManager.scala:954*)
>
> at org.apache.spark.storage.BlockManager.getRemoteBytes(
> *BlockManager.scala:1092*)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
> *TaskResultGetter.scala:88*)
>
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(
> *Utils.scala:1932*)
>
> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
> *TaskResultGetter.scala:63*)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> *ThreadPoolExecutor.java:1149*)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> *ThreadPoolExecutor.java:624*)
>
> at java.lang.Thread.run(*Thread.java:748*)
>
> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
> Permission denied: no further information:
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> Caused by: *java.net.SocketException*: Permission denied: no further
> information
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> *SocketChannelImpl.java:715*)
>
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
> *NioSocketChannel.java:330*)
>
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(
> *AbstractNioChannel.java:334*)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> *NioEventLoop.java:702*)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> *NioEventLoop.java:650*)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> *NioEventLoop.java:576*)
>
> at io.netty.channel.nio.NioEventLoop.run(
> *NioEventLoop.java:493*)
>
> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(
> *SingleThreadEventExecutor.java:989*)
>
> at io.netty.util.internal.ThreadExecutorMap$2.run(
> *ThreadExecutorMap.java:74*)
>
> at io.netty.util.concurrent.FastThreadLocalRunnable.run(
> *FastThreadLocalRunnable.java:30*)
>
> at java.lang.Thread.run(*Thread.java:748*)
>
> 21/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
>
> *java.io.IOException*: Failed to connect to
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:253*)
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:195*)
>
> at
> org.apache.

Re: Issue with executer

2021-01-20 Thread Vikas Garg
Hi Sachit,

I am running it in local. My IP mentioned is the private IP address and
therefore it is useless for anyone.

On Wed, 20 Jan 2021 at 17:37, Sachit Murarka 
wrote:

> Hi Vikas
>
> 1. Are you running in local mode? Master has local[*]
> 2. Pls mask the ip or confidential info while sharing logs
>
> Thanks
> Sachit
>
> On Wed, 20 Jan 2021, 17:35 Vikas Garg,  wrote:
>
>> Hi,
>>
>> I am facing issue with spark executor. I am struggling with this issue
>> since last many days and unable to resolve the issue.
>>
>> Below is the configuration I have given.
>>
>>   val spark = SparkSession.builder()
>> .appName("Spark Job")
>> .master("local[*]")
>> .config("spark.dynamicAllocation.enabled", true)
>> .config("spark.shuffle.service.enabled", true)
>> .config("spark.driver.maxResultSize", "8g")
>> .config("spark.driver.memory", "8g")
>> .config("spark.executor.memory", "8g")
>> .config("spark.network.timeout", "3600s")
>> .getOrCreate()
>>
>> 1/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
>> fetch of 1 outstanding blocks
>>
>> *java.io.IOException*: Failed to connect to
>> del1-lhp-n9.synapse.com/192.168.166.213:51348
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(
>> *TransportClientFactory.java:253*)
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(
>> *TransportClientFactory.java:195*)
>>
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>> *NettyBlockTransferService.scala:122*)
>>
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>> *RetryingBlockFetcher.java:141*)
>>
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>> *RetryingBlockFetcher.java:121*)
>>
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>> *NettyBlockTransferService.scala:143*)
>>
>> at
>> org.apache.spark.network.BlockTransferService.fetchBlockSync(
>> *BlockTransferService.scala:103*)
>>
>> at
>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>> *BlockManager.scala:1010*)
>>
>> at
>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>> *BlockManager.scala:954*)
>>
>> at scala.Option.orElse(*Option.scala:289*)
>>
>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>> *BlockManager.scala:954*)
>>
>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>> *BlockManager.scala:1092*)
>>
>> at
>> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
>> *TaskResultGetter.scala:88*)
>>
>> at
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(
>> *Utils.scala:1932*)
>>
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
>> *TaskResultGetter.scala:63*)
>>
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> *ThreadPoolExecutor.java:1149*)
>>
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> *ThreadPoolExecutor.java:624*)
>>
>> at java.lang.Thread.run(*Thread.java:748*)
>>
>> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
>> Permission denied: no further information:
>> del1-lhp-n9.synapse.com/192.168.166.213:51348
>>
>> Caused by: *java.net.SocketException*: Permission denied: no further
>> information
>>
>> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>>
>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>> *SocketChannelImpl.java:715*)
>>
>> at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
>> *NioSocketChannel.java:330*)
>>
>> at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(
>> *AbstractNioChannel.java:334*)
>>
>> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
>> *NioEventLoop.java:702*)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
>> *NioEventLoop.java:650*)
>>
>> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
>> *NioEventLoop.java:576*)
>>
>> at io.netty.channel.nio.NioEventLoop.run(
>> *NioEventLoop.java:493*)
>>
>> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(
>> *SingleThreadEventExecutor.java:989*)
>>
>> at io.netty.util.internal.ThreadExecutorMap$2.run(
>> *ThreadExecutorMap.java:74*)
>>
>> at io.netty.util.concurrent.FastThreadLocalRunnable.run(
>> *FastThreadLocalRunnable.java:30*)
>>
>> at java.lang.Thread.run(*Thread.java:748*)
>>
>> 21/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
>> fetch of 1 outstanding blocks
>>
>> *java.io.IOException*: Failed to connect to
>> del1-

Re: Issue with executer

2021-01-20 Thread Mich Talebzadeh
Hi Vikas,

Are you running this on your local laptop etc or using some IDE etc?

What is your available memory for Spark?

Start with minimum set like below

def spark_session_local(appName):
return SparkSession.builder \
.master('local[1]') \
.appName(appName) \
.enableHiveSupport() \
.getOrCreate()


HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 20 Jan 2021 at 12:32, Vikas Garg  wrote:

> Hi Sachit,
>
> I am running it in local. My IP mentioned is the private IP address and
> therefore it is useless for anyone.
>
> On Wed, 20 Jan 2021 at 17:37, Sachit Murarka 
> wrote:
>
>> Hi Vikas
>>
>> 1. Are you running in local mode? Master has local[*]
>> 2. Pls mask the ip or confidential info while sharing logs
>>
>> Thanks
>> Sachit
>>
>> On Wed, 20 Jan 2021, 17:35 Vikas Garg,  wrote:
>>
>>> Hi,
>>>
>>> I am facing issue with spark executor. I am struggling with this issue
>>> since last many days and unable to resolve the issue.
>>>
>>> Below is the configuration I have given.
>>>
>>>   val spark = SparkSession.builder()
>>> .appName("Spark Job")
>>> .master("local[*]")
>>> .config("spark.dynamicAllocation.enabled", true)
>>> .config("spark.shuffle.service.enabled", true)
>>> .config("spark.driver.maxResultSize", "8g")
>>> .config("spark.driver.memory", "8g")
>>> .config("spark.executor.memory", "8g")
>>> .config("spark.network.timeout", "3600s")
>>> .getOrCreate()
>>>
>>> 1/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
>>> fetch of 1 outstanding blocks
>>>
>>> *java.io.IOException*: Failed to connect to
>>> del1-lhp-n9.synapse.com/192.168.166.213:51348
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:253*)
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:195*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>> *NettyBlockTransferService.scala:122*)
>>>
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>> *RetryingBlockFetcher.java:141*)
>>>
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>> *RetryingBlockFetcher.java:121*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>> *NettyBlockTransferService.scala:143*)
>>>
>>> at
>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>> *BlockTransferService.scala:103*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>> *BlockManager.scala:1010*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>> *BlockManager.scala:954*)
>>>
>>> at scala.Option.orElse(*Option.scala:289*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>> *BlockManager.scala:954*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>> *BlockManager.scala:1092*)
>>>
>>> at
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
>>> *TaskResultGetter.scala:88*)
>>>
>>> at
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(
>>> *Utils.scala:1932*)
>>>
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
>>> *TaskResultGetter.scala:63*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> *ThreadPoolExecutor.java:1149*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> *ThreadPoolExecutor.java:624*)
>>>
>>> at java.lang.Thread.run(*Thread.java:748*)
>>>
>>> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
>>> Permission denied: no further information:
>>> del1-lhp-n9.synapse.com/192.168.166.213:51348
>>>
>>> Caused by: *java.net.SocketException*: Permission denied: no further
>>> information
>>>
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*
>>> )
>>>
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>>> *SocketChannelImpl.java:715*)
>>>
>>> at
>>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
>>> *NioSocketChannel.java:330*)
>>>
>>> at
>>> io.netty.channel.nio.AbstractNioCh

RDD filter in for loop gave strange results

2021-01-20 Thread Marco Wong
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results.
The filtered RDD contains non-existent elements which were filtered away
earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
print("RDD is ", rdd.collect())
print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
rdd = rdd.filter(lambda x:x!=i)
print("Result is ", rdd.collect())
print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco


Spark RDD + HBase: adoption trend

2021-01-20 Thread Marco Firrincieli
Hi, my name is Marco and I'm one of the developers behind 
https://github.com/unicredit/hbase-rdd 
a project we are currently reviewing for various reasons.

We were basically wondering if RDD "is still a thing" nowadays (we see lots of 
usage for DataFrames or Datasets) and we're not sure how much of the community 
still works/uses RDDs.

Also, for lack of time, we always mainly worked using Cloudera-flavored 
Hadoop/HBase & Spark versions. We were thinking the community would then help 
us organize the project in a more "generic" way, but that didn't happen. 

So I figured I would ask here what is the gut feeling of the Spark community so 
to better define the future of our little library. 

Thanks

-Marco

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Issue with executer

2021-01-20 Thread Vikas Garg
The issue is resolved. Resolution is little weird but it worked. It was due
to scala version mismatch with projects in my package.

On Wed, 20 Jan 2021 at 18:07, Mich Talebzadeh 
wrote:

> Hi Vikas,
>
> Are you running this on your local laptop etc or using some IDE etc?
>
> What is your available memory for Spark?
>
> Start with minimum set like below
>
> def spark_session_local(appName):
> return SparkSession.builder \
> .master('local[1]') \
> .appName(appName) \
> .enableHiveSupport() \
> .getOrCreate()
>
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 20 Jan 2021 at 12:32, Vikas Garg  wrote:
>
>> Hi Sachit,
>>
>> I am running it in local. My IP mentioned is the private IP address and
>> therefore it is useless for anyone.
>>
>> On Wed, 20 Jan 2021 at 17:37, Sachit Murarka 
>> wrote:
>>
>>> Hi Vikas
>>>
>>> 1. Are you running in local mode? Master has local[*]
>>> 2. Pls mask the ip or confidential info while sharing logs
>>>
>>> Thanks
>>> Sachit
>>>
>>> On Wed, 20 Jan 2021, 17:35 Vikas Garg,  wrote:
>>>
 Hi,

 I am facing issue with spark executor. I am struggling with this issue
 since last many days and unable to resolve the issue.

 Below is the configuration I have given.

   val spark = SparkSession.builder()
 .appName("Spark Job")
 .master("local[*]")
 .config("spark.dynamicAllocation.enabled", true)
 .config("spark.shuffle.service.enabled", true)
 .config("spark.driver.maxResultSize", "8g")
 .config("spark.driver.memory", "8g")
 .config("spark.executor.memory", "8g")
 .config("spark.network.timeout", "3600s")
 .getOrCreate()

 1/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 1 outstanding blocks

 *java.io.IOException*: Failed to connect to
 del1-lhp-n9.synapse.com/192.168.166.213:51348

 at
 org.apache.spark.network.client.TransportClientFactory.createClient(
 *TransportClientFactory.java:253*)

 at
 org.apache.spark.network.client.TransportClientFactory.createClient(
 *TransportClientFactory.java:195*)

 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
 *NettyBlockTransferService.scala:122*)

 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
 *RetryingBlockFetcher.java:141*)

 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
 *RetryingBlockFetcher.java:121*)

 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
 *NettyBlockTransferService.scala:143*)

 at
 org.apache.spark.network.BlockTransferService.fetchBlockSync(
 *BlockTransferService.scala:103*)

 at
 org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
 *BlockManager.scala:1010*)

 at
 org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
 *BlockManager.scala:954*)

 at scala.Option.orElse(*Option.scala:289*)

 at org.apache.spark.storage.BlockManager.getRemoteBlock(
 *BlockManager.scala:954*)

 at org.apache.spark.storage.BlockManager.getRemoteBytes(
 *BlockManager.scala:1092*)

 at
 org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
 *TaskResultGetter.scala:88*)

 at
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)

 at org.apache.spark.util.Utils$.logUncaughtExceptions(
 *Utils.scala:1932*)

 at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
 *TaskResultGetter.scala:63*)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 *ThreadPoolExecutor.java:1149*)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 *ThreadPoolExecutor.java:624*)

 at java.lang.Thread.run(*Thread.java:748*)

 Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
 Permission denied: no further information:
 del1-lhp-n9.synapse.com/192.168.166.213:51348

 Caused by: *java.net.SocketException*: Permission denied: no further
 information

   

Re: Spark RDD + HBase: adoption trend

2021-01-20 Thread Jacek Laskowski
Hi Marco,

IMHO RDD is only for very sophisticated use cases that very few Spark devs
would be capable of. I consider RDD API a sort of Spark assembler and most
Spark devs should stick to Dataset API.

Speaking of HBase, see
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/bigtable/spark
where you can find a demo that I worked on last year and made sure that:

"Apache HBase™ Spark Connector implements the DataSource API for Apache
HBase and allows executing relational queries on data stored in Cloud
Bigtable."

That makes hbase-rdd even more obsolete but not necessarily unusable (I am
little skilled in the HBase space to comment on this).

I think you should consider merging the project hbase-rdd of yours with the
official Apache HBase™ Spark Connector at
https://github.com/apache/hbase-connectors/tree/master/spark (as they seem
to lack active development IMHO).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Wed, Jan 20, 2021 at 2:44 PM Marco Firrincieli 
wrote:

> Hi, my name is Marco and I'm one of the developers behind
> https://github.com/unicredit/hbase-rdd
> a project we are currently reviewing for various reasons.
>
> We were basically wondering if RDD "is still a thing" nowadays (we see
> lots of usage for DataFrames or Datasets) and we're not sure how much of
> the community still works/uses RDDs.
>
> Also, for lack of time, we always mainly worked using Cloudera-flavored
> Hadoop/HBase & Spark versions. We were thinking the community would then
> help us organize the project in a more "generic" way, but that didn't
> happen.
>
> So I figured I would ask here what is the gut feeling of the Spark
> community so to better define the future of our little library.
>
> Thanks
>
> -Marco
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Jacek Laskowski
Hi Marco,

A Scala dev here.

In short: yet another reason against Python :)

Honestly, I've got no idea why the code gives the output. Ran it with
3.1.1-rc1 and got the very same results. Hoping pyspark/python devs will
chime in and shed more light on this.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Wed, Jan 20, 2021 at 2:07 PM Marco Wong  wrote:

> Dear Spark users,
>
> I ran the Python code below on a simple RDD, but it gave strange results.
> The filtered RDD contains non-existent elements which were filtered away
> earlier. Any idea why this happened?
> ```
> rdd = spark.sparkContext.parallelize([0,1,2])
> for i in range(3):
> print("RDD is ", rdd.collect())
> print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
> rdd = rdd.filter(lambda x:x!=i)
> print("Result is ", rdd.collect())
> print()
> ```
> which gave
> ```
> RDD is  [0, 1, 2]
> Filtered RDD is  [1, 2]
> Result is  [1, 2]
>
> RDD is  [1, 2]
> Filtered RDD is  [0, 2]
> Result is  [0, 2]
>
> RDD is  [0, 2]
> Filtered RDD is  [0, 1]
> Result is  [0, 1]
> ```
>
> Thanks,
>
> Marco
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Sean Owen
That looks very odd indeed. Things like this work as expected:

rdd = spark.sparkContext.parallelize([0,1,2])

def my_filter(data, i):
  return data.filter(lambda x: x != i)

for i in range(3):
  rdd = my_filter(rdd, i)
rdd.collect()

... as does unrolling the loop.

But your example behaves as if only the final filter is applied. Is this is
some really obscure Python scoping thing with lambdas that I don't
understand, like the lambda only binds i once? but then you'd expect to
only filter the first number.

I also keep looking in the code to figure out if these are somehow being
erroneously 'collapsed' as the same function, but the RDD APIs don't do
that kind of thing. They get put into a chain of pipeline_funcs, but, still
shouldn't be an issue. I wonder if this is some strange interaction with
serialization of the lambda and/or scoping?

Really strange! python people?

On Wed, Jan 20, 2021 at 7:14 AM Marco Wong  wrote:

> Dear Spark users,
>
> I ran the Python code below on a simple RDD, but it gave strange results.
> The filtered RDD contains non-existent elements which were filtered away
> earlier. Any idea why this happened?
> ```
> rdd = spark.sparkContext.parallelize([0,1,2])
> for i in range(3):
> print("RDD is ", rdd.collect())
> print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
> rdd = rdd.filter(lambda x:x!=i)
> print("Result is ", rdd.collect())
> print()
> ```
> which gave
> ```
> RDD is  [0, 1, 2]
> Filtered RDD is  [1, 2]
> Result is  [1, 2]
>
> RDD is  [1, 2]
> Filtered RDD is  [0, 2]
> Result is  [0, 2]
>
> RDD is  [0, 2]
> Filtered RDD is  [0, 1]
> Result is  [0, 1]
> ```
>
> Thanks,
>
> Marco
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread יורי אולייניקוב
A. global scope and global variables are bad habits in Python (this is
about an 'rdd' and 'i' variable used in lambda).
B. lambdas are usually misused and abused in Python especially when they
used in global context: ideally you'd like to use pure functions and use
something like:
```

def my_rdd_filter(value, cur_elem):
return cur_elem != value

rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
func_filter = functools.partial(my_rdd_filter, i)
rdd = rdd.filter(func_filter)

```
This is better and testable Pythonic code: if you want to pass a context
for callable -> use partial or create a callable object with context in
__init__ arg (BTW this is what is done in Java).

Unfortunately partials and  callable objects are not supported in PySpark
- though they considered more Pythonic way.

anyway,
following works as you expected

def filter_rdd(j, my_rdd):
# this is a local context
print("RDD is ", my_rdd.collect())
print("Filtered RDD is ", my_rdd.filter(lambda x: x != j).collect())
my_rdd = my_rdd.filter(lambda x: x != j)
print("Result is ", my_rdd.collect())
print()
return my_rdd

# this is global context
rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
rdd = filter_rdd(i, rdd)


This is better and testable Pythonic code: if you want to pass a context
for callable -> use partial or create a callable object with context in
__init__ arg (BTW this is what is done in Java).

Unfortunately partials and callable objects are not supported in PySpark  -
though they considered more Pythonic way.

anyway running code other than calling main/seting contstants in global
context is bad practice in Python.

Hope this helps


ср, 20 янв. 2021 г. в 15:08, Marco Wong :

> Dear Spark users,
>
> I ran the Python code below on a simple RDD, but it gave strange results.
> The filtered RDD contains non-existent elements which were filtered away
> earlier. Any idea why this happened?
> ```
> rdd = spark.sparkContext.parallelize([0,1,2])
> for i in range(3):
> print("RDD is ", rdd.collect())
> print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
> rdd = rdd.filter(lambda x:x!=i)
> print("Result is ", rdd.collect())
> print()
> ```
> which gave
> ```
> RDD is  [0, 1, 2]
> Filtered RDD is  [1, 2]
> Result is  [1, 2]
>
> RDD is  [1, 2]
> Filtered RDD is  [0, 2]
> Result is  [0, 2]
>
> RDD is  [0, 2]
> Filtered RDD is  [0, 1]
> Result is  [0, 1]
> ```
>
> Thanks,
>
> Marco
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Zhu Jingnan
I thought that was right result.

As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i
will be updated to the latest i value, so only one will be filter out.

Regards

Jingnan

On Wed, Jan 20, 2021 at 9:01 AM Sean Owen  wrote:

> That looks very odd indeed. Things like this work as expected:
>
> rdd = spark.sparkContext.parallelize([0,1,2])
>
> def my_filter(data, i):
>   return data.filter(lambda x: x != i)
>
> for i in range(3):
>   rdd = my_filter(rdd, i)
> rdd.collect()
>
> ... as does unrolling the loop.
>
> But your example behaves as if only the final filter is applied. Is this
> is some really obscure Python scoping thing with lambdas that I don't
> understand, like the lambda only binds i once? but then you'd expect to
> only filter the first number.
>
> I also keep looking in the code to figure out if these are somehow being
> erroneously 'collapsed' as the same function, but the RDD APIs don't do
> that kind of thing. They get put into a chain of pipeline_funcs, but, still
> shouldn't be an issue. I wonder if this is some strange interaction with
> serialization of the lambda and/or scoping?
>
> Really strange! python people?
>
> On Wed, Jan 20, 2021 at 7:14 AM Marco Wong  wrote:
>
>> Dear Spark users,
>>
>> I ran the Python code below on a simple RDD, but it gave strange results.
>> The filtered RDD contains non-existent elements which were filtered away
>> earlier. Any idea why this happened?
>> ```
>> rdd = spark.sparkContext.parallelize([0,1,2])
>> for i in range(3):
>> print("RDD is ", rdd.collect())
>> print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
>> rdd = rdd.filter(lambda x:x!=i)
>> print("Result is ", rdd.collect())
>> print()
>> ```
>> which gave
>> ```
>> RDD is  [0, 1, 2]
>> Filtered RDD is  [1, 2]
>> Result is  [1, 2]
>>
>> RDD is  [1, 2]
>> Filtered RDD is  [0, 2]
>> Result is  [0, 2]
>>
>> RDD is  [0, 2]
>> Filtered RDD is  [0, 1]
>> Result is  [0, 1]
>> ```
>>
>> Thanks,
>>
>> Marco
>>
>


Re: Spark RDD + HBase: adoption trend

2021-01-20 Thread Sean Owen
RDDs are still relevant in a few ways - there is no Dataset in Python for
example, so RDD is still the 'typed' API. They still underpin DataFrames.
And of course it's still there because there's probably still a lot of code
out there that uses it. Occasionally it's still useful to drop into that
API for certain operations.

If that's a connector to read data from HBase - you probably do want to
return DataFrames ideally.
Unless you're relying on very specific APIs from very specific versions, I
wouldn't think a distro's Spark or HBase is much different?

On Wed, Jan 20, 2021 at 7:44 AM Marco Firrincieli 
wrote:

> Hi, my name is Marco and I'm one of the developers behind
> https://github.com/unicredit/hbase-rdd
> a project we are currently reviewing for various reasons.
>
> We were basically wondering if RDD "is still a thing" nowadays (we see
> lots of usage for DataFrames or Datasets) and we're not sure how much of
> the community still works/uses RDDs.
>
> Also, for lack of time, we always mainly worked using Cloudera-flavored
> Hadoop/HBase & Spark versions. We were thinking the community would then
> help us organize the project in a more "generic" way, but that didn't
> happen.
>
> So I figured I would ask here what is the gut feeling of the Spark
> community so to better define the future of our little library.
>
> Thanks
>
> -Marco
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Sean Owen
No, because the final rdd is really the result of chaining 3 filter
operations. They should all execute. It _should_ work like
"rdd.filter(...).filter(..).filter(...)"

On Wed, Jan 20, 2021 at 9:46 AM Zhu Jingnan  wrote:

> I thought that was right result.
>
> As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i
> will be updated to the latest i value, so only one will be filter out.
>
> Regards
>
> Jingnan
>
>
>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Marco Wong
Hmm, I think I got what Jingnan means. The lambda function is x != i and i
is not evaluated when the lambda function was defined. So the pipelined rdd
is rdd.filter(lambda x: x != i).filter(lambda x: x != i), rather than
having the values of i substituted. Does that make sense to you, Sean?

On Wed, 20 Jan 2021 at 15:51, Sean Owen  wrote:

> No, because the final rdd is really the result of chaining 3 filter
> operations. They should all execute. It _should_ work like
> "rdd.filter(...).filter(..).filter(...)"
>
> On Wed, Jan 20, 2021 at 9:46 AM Zhu Jingnan 
> wrote:
>
>> I thought that was right result.
>>
>> As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i
>> will be updated to the latest i value, so only one will be filter out.
>>
>> Regards
>>
>> Jingnan
>>
>>
>>


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Sean Owen
Heh that could make sense, but that definitely was not my mental model of
how python binds variables! Definitely is not how Scala works.

On Wed, Jan 20, 2021 at 10:00 AM Marco Wong  wrote:

> Hmm, I think I got what Jingnan means. The lambda function is x != i and i
> is not evaluated when the lambda function was defined. So the pipelined rdd
> is rdd.filter(lambda x: x != i).filter(lambda x: x != i), rather than
> having the values of i substituted. Does that make sense to you, Sean?
>
>>


Structured Streaming Spark 3.0.1

2021-01-20 Thread gshen
Hi all:

I am having a strange issue incorporating `groupBy` statements into a
structured streaming job when trying to write to Kafka or Delta. Weirdly it
only appears to work if I write to console, or to memory...

*I'm running Spark 3.0.1 with the following dependencies:
*
io.delta:delta-core_2.12:0.7.0
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
org.apache.hadoop:hadoop-azure:3.2.1"

*Here's a example of the pyspark job I've been testing with:*

/kafka = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","kafka-broker:29092")\
.option("subscribe","test")\
.option("startingOffsets", "earliest")\
.option("maxOffsetsPerTrigger", 5) \
.load()

rawDF = kafka.selectExpr("CAST(value AS STRING)")

groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))

kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")) AS
value")

kafka_stream_output \
.writeStream \
.format("kafka") \
.outputMode("update") \
.option("kafka.bootstrap.servers", "kafka-broker:29092") \
.option("topic", "sink") \
.option("checkpointLocation", checkpoint_location) \
.start()/
*
If I don't have a groupBy/aggregation, it's able to stream out to Kafka
perfectly fine; but when it's included, it writes a couple of messages to
the sink then throws an abstract error:*

Caused by: org.apache.spark.util.TaskCompletionListenerException:
Self-suppression not permitted
at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
at org.apache.spark.scheduler.Task.run(Task.scala:143)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

org.apache.spark.scheduler.TaskSetManager Task 1 in stage 1.0 failed 4
times; aborting
job"[org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec]
Data source write support
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@7337dcc is
aborting."


org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataS
ourceV2Exec.scala:413)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToData
SourceV2Exec.scala:361)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(Write
ToDataSourceV2Exec.scala:322)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSo
urceV2Exec.scala:329)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2Command
Exec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:
39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExe
c.scala:45)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(Micro
BatchExecution.scala:576)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution
.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:1
60)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution
.scala:87)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(Micro
BatchExecution.scala:571)

*I have tried using w

Re: Structured Streaming Spark 3.0.1

2021-01-20 Thread gshen
This SO post is pretty much the exact same issue:

https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic

The user mentions it's an issue with
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2021-01-20 Thread luby

陆伯鹰
中国投资有限责任公司信息技术部
电话:+86 (0)10 84096521
传真:+86 (0)10 64086851 
北京市东城区朝阳门北大街1号新保利大厦8层 100010
网站:www.china-inv.cn 
 




 
本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外
披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件
人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。

 
This email message may contain confidential and/or privileged information. 
If you are not the intended recipient, please do not read, save, forward, 
disclose or copy the contents of this email or open any file attached to 
this email. We will be grateful if you could advise the sender immediately 
by replying this email, and delete this email and any attachment or links 
to this email completely and immediately from your computer system. 





Spark job stuck after read and not starting next stage

2021-01-20 Thread srinivasarao daruna
Hi,
I am running a spark job on a huge dataset. I have allocated 10 R5.16xlarge
machines. (each consists 64cores, 512G).

The source data is json and i need to do some json transformations. So, i
read them as text and then convert to a dataframe.

ds = spark.read.textFile()
updated_dataset = ds.withColumn(applying my transformations).as[String]
df = spark.read.json(updated_dataset)

df.write.save()

Some notes:
The source data is heavy and deeply nested. The printSchema contains a lot
of nested structs.

in the spark ui, json stage is first and after that is completed, it is not
showing any jobs in the UI and it's just hanging there.

All executors were dead and only the driver was active.

Thank You,
Regards,
Srini


Re: Spark job stuck after read and not starting next stage

2021-01-20 Thread German Schiavon
Hi,
 not sure if it is your case, but if the source data is heavy and deeply
nested I'd recommend explicitly providing the schema when reading the json.

df = spark.read.schema(schema).json(updated_dataset)


On Thu, 21 Jan 2021 at 04:15, srinivasarao daruna 
wrote:

> Hi,
> I am running a spark job on a huge dataset. I have allocated 10
> R5.16xlarge machines. (each consists 64cores, 512G).
>
> The source data is json and i need to do some json transformations. So, i
> read them as text and then convert to a dataframe.
>
> ds = spark.read.textFile()
> updated_dataset = ds.withColumn(applying my transformations).as[String]
> df = spark.read.json(updated_dataset)
>
> df.write.save()
>
> Some notes:
> The source data is heavy and deeply nested. The printSchema contains a lot
> of nested structs.
>
> in the spark ui, json stage is first and after that is completed, it is
> not showing any jobs in the UI and it's just hanging there.
>
> All executors were dead and only the driver was active.
>
> Thank You,
> Regards,
> Srini
>


Re: Use case advice

2021-01-20 Thread purav aggarwal
Unsubscribe

On Fri, Jan 15, 2021 at 9:52 AM Dilip Desavali 
wrote:

> Unsubscribe
>


Re: Structured Streaming Spark 3.0.1

2021-01-20 Thread German Schiavon
Hi,

I couldn't reproduce this error :/ I wonder if there is something else
underline causing it...

*Input*
➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}

*Output*
➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}


val rawDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test1")
  .load
  .selectExpr("CAST(value AS STRING)")


val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")

kafka_stream_output
  .writeStream
  .format("kafka")
  .outputMode("update")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sink")
  .option("checkpointLocation", "/tmp/check")
  .start()

spark.streams.awaitAnyTermination()


On Wed, 20 Jan 2021 at 23:22, gshen  wrote:

> This SO post is pretty much the exact same issue:
>
>
> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>
> The user mentions it's an issue with
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Structured Streaming Spark 3.0.1

2021-01-20 Thread Jungtaek Lim
I quickly looked into the attached log in SO post, and the problem doesn't
seem to be related to Kafka. The error stack trace is from checkpointing to
GCS, and the implementation of OutputStream for GCS seems to be provided
with Google.

Could you please elaborate the stack trace or upload the log with redacting
secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon 
wrote:

> Hi,
>
> I couldn't reproduce this error :/ I wonder if there is something else
> underline causing it...
>
> *Input*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
> localhost:9092 --topic test1
> {"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
>
> *Output*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic sink
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
>
>
> val rawDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test1")
>   .load
>   .selectExpr("CAST(value AS STRING)")
>
>
> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
>
> kafka_stream_output
>   .writeStream
>   .format("kafka")
>   .outputMode("update")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sink")
>   .option("checkpointLocation", "/tmp/check")
>   .start()
>
> spark.streams.awaitAnyTermination()
>
>
> On Wed, 20 Jan 2021 at 23:22, gshen  wrote:
>
>> This SO post is pretty much the exact same issue:
>>
>>
>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>>
>> The user mentions it's an issue with
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>