[ https://issues.apache.org/jira/browse/BEAM-2778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía reassigned BEAM-2778: ---------------------------------- Assignee: (was: Amit Sela) > Serialization stack error using spark stream > ---------------------------------------------- > > Key: BEAM-2778 > URL: https://issues.apache.org/jira/browse/BEAM-2778 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 2.0.0 > Reporter: li yuntian > Priority: Major > > options...... > Pipeline pipeline = Pipeline.create(options); > KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() > .withBootstrapServers("10.139.7.xx:9092") > .withTopics(Collections.singletonList("test")) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class); > PCollection<String> kafkaJsonPc = pipeline.apply(read.withoutMetadata()) > > .apply(Window.<KV<String,String>>into(FixedWindows.of(Duration.standardMinutes(1)))) > .apply(Values.<String> create()); > kafkaJsonPc.apply(WithKeys.<String, String>of("global")) > .apply(GroupByKey.<String, String>create()); > I get errors, IF I DELETE " .apply(GroupByKey.<String, String>create())" > everything is fine. > SO I think is there something wrong with GroupBy Transform in spark streaming? > I find a jira https://issues.apache.org/jira/browse/BEAM-1624 is these the > same? when to fix? > errors: > 17/08/18 15:31:37 INFO BlockManagerInfo: Added broadcast_42_piece0 in memory > on localhost:56153 (size: 399.0 B, free: 1804.1 MB) > 17/08/18 15:31:37 INFO SparkContext: Created broadcast 42 from broadcast at > GlobalWatermarkHolder.java:135 > 17/08/18 15:31:37 WARN JobGenerator: Timed out while stopping the job > generator (timeout = 5000) > 17/08/18 15:31:37 INFO JobGenerator: Waited for jobs to be processed and > checkpoints to be written > 17/08/18 15:31:47 INFO CheckpointWriter: CheckpointWriter executor terminated > ? false, waited for 10001 ms. > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 INFO JobGenerator: Stopped JobGenerator > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN Client: interrupted waiting to send rpc request to > server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 17/08/18 15:31:47 WARN RetryInvocationHandler: Exception while invoking class > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename. > Not retrying because failovers (15) exceeded maximum allowed (15) > java.io.IOException: java.lang.InterruptedException > at org.apache.hadoop.ipc.Client.call(Client.java:1326) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > ... 16 more > 17/08/18 15:31:47 WARN CheckpointWriter: Error in attempt 1 of writing > checkpoint to hdfs://bchcluster/tmp/Beam Job > Spark0.58879054/spark-checkpoint/checkpoint-1503041490500 > java.io.IOException: java.lang.InterruptedException > at org.apache.hadoop.ipc.Client.call(Client.java:1326) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy38.rename(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:396) > at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy41.rename(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1555) > at > org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:522) > at > org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:238) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:970) > at org.apache.hadoop.ipc.Client.call(Client.java:1320) > ... 16 more > 17/08/18 15:31:47 WARN CheckpointWriter: Could not write checkpoint for time > 1503041488000 ms to file hdfs://bchcluster/tmp/Beam Job > Spark0.58879054/spark-checkpoint/checkpoint-1503041490500' > 17/08/18 15:31:47 INFO JobScheduler: Stopped JobScheduler > 17/08/18 15:31:47 INFO StreamingContext: StreamingContext stopped successfully > Exception in thread "main" java.lang.RuntimeException: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 > in stage 93.0 (TID 30) had a not serializable result: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, > value: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55) > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > - object (class scala.Tuple2, > (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])) > - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) > - object (class scala.Tuple2, > (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))) > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:55) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) > at > org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:41) > at > org.apache.beam.runners.spark.SparkPipelineResult$StreamingMode.stop(SparkPipelineResult.java:163) > at > org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:198) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87) > at > com.chinamobile.cmss.example.TwoCountBaseApp.main(TwoCountBaseApp.java:83) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0.0 in stage 93.0 (TID 30) had a not serializable result: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, > value: > org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55) > - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) > - object (class scala.Tuple2, > (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[])) > - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) > - object (class scala.Tuple2, > (org.apache.beam.runners.spark.util.ByteArray@eacf3ee4,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@14b2cd55,[]))) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) > at > org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) > at > org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:77) > at > org.apache.beam.runners.spark.translation.streaming.UnboundedDataset$1.call(UnboundedDataset.java:74) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian Jira (v8.3.4#803005)