[ 
https://issues.apache.org/jira/browse/BEAM-2778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía reassigned BEAM-2778:
----------------------------------

    Assignee:     (was: Amit Sela)

> Serialization stack error  using spark stream 
> ----------------------------------------------
>
>                 Key: BEAM-2778
>                 URL: https://issues.apache.org/jira/browse/BEAM-2778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.0.0
>            Reporter: li yuntian
>            Priority: Major
>
> options......
> Pipeline pipeline = Pipeline.create(options);
> KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
>                 .withBootstrapServers("10.139.7.xx:9092")
>                 .withTopics(Collections.singletonList("test"))
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class);
> PCollection<String> kafkaJsonPc = pipeline.apply(read.withoutMetadata())
>        
> .apply(Window.<KV<String,String>>into(FixedWindows.of(Duration.standardMinutes(1))))
>                 .apply(Values.<String> create());
>         kafkaJsonPc.apply(WithKeys.<String, String>of("global"))
>                 .apply(GroupByKey.<String, String>create());
> I get errors, IF I DELETE " .apply(GroupByKey.<String, String>create())"   
> everything is fine.
> SO I think is there something wrong with GroupBy Transform in spark streaming?
> I find a jira https://issues.apache.org/jira/browse/BEAM-1624  is these the 
> same? when to fix?
> errors:
> 17/08/18 15:31:37 INFO BlockManagerInfo: Added broadcast_42_piece0 in memory 
> on localhost:56153 (size: 399.0 B, free: 1804.1 MB)
> 17/08/18 15:31:37 INFO SparkContext: Created broadcast 42 from broadcast at 
> GlobalWatermarkHolder.java:135
> 17/08/18 15:31:37 WARN JobGenerator: Timed out while stopping the job 
> generator (timeout = 5000)
> 17/08/18 15:31:37 INFO JobGenerator: Waited for jobs to be processed and 
> checkpoints to be written
> 17/08/18 15:31:47 INFO CheckpointWriter: CheckpointWriter executor terminated 
> ? false, waited for 10001 ms.
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 INFO JobGenerator: Stopped JobGenerator
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to 
> server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 17/08/18 15:31:47 WARN RetryInvocationHandler: Exception while invoking class 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename. 
> Not retrying because failovers (15) exceeded maximum allowed (15)
> java.io.IOException: java.lang.InterruptedException
>       at org.apache.hadoop.ipc.Client.call(Client.java:1326)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       ... 16 more
> 17/08/18 15:31:47 WARN CheckpointWriter: Error in attempt 1 of writing 
> checkpoint to hdfs://bchcluster/tmp/Beam Job 
> Spark0.58879054/spark-checkpoint/checkpoint-1503041490500
> java.io.IOException: java.lang.InterruptedException
>       at org.apache.hadoop.ipc.Client.call(Client.java:1326)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>       at com.sun.proxy.$Proxy38.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396)
>       at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy41.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522)
>       at 
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1320)
>       ... 16 more
> 17/08/18 15:31:47 WARN CheckpointWriter: Could not write checkpoint for time 
> 1503041488000 ms to file hdfs://bchcluster/tmp/Beam Job 
> Spark0.58879054/spark-checkpoint/checkpoint-1503041490500'
> 17/08/18 15:31:47 INFO JobScheduler: Stopped JobScheduler
> 17/08/18 15:31:47 INFO StreamingContext: StreamingContext stopped successfully
> Exception in thread "main" java.lang.RuntimeException: 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 
> in stage 93.0 (TID 30) had a not serializable result: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
> Serialization stack:
>       - object not serializable (class: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers,
>  value: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
>       - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
>       - object (class scala.Tuple2, 
> (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
>       - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
>       - object (class scala.Tuple2, 
> (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
>       at 
> com.chinamobile.cmss.example.TwoCountBaseApp.main(TwoCountBaseApp.java:83)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0.0 in stage 93.0 (TID 30) had a not serializable result: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
> Serialization stack:
>       - object not serializable (class: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers,
>  value: 
> org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55)
>       - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
>       - object (class scala.Tuple2, 
> (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))
>       - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
>       - object (class scala.Tuple2, 
> (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])))
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>       at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
>       at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
>       at 
> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332)
>       at 
> org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46)
>       at 
> org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:77)
>       at 
> org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:74)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>       at scala.util.Try$.apply(Try.scala:161)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to