?????? flink????????
??TaskManager?? 2019-07-22 05:39:03,987 WARN org.apache.hadoop.ipc.Client - Failed to connect to server: master/10.0.2.11:9000: try once and fail. java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788) at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550) at org.apache.hadoop.ipc.Client.call(Client.java:1381) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116) at com.sun.proxy.$Proxy17.mkdirs(Unknown Source)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:583) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2540) at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2515) at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1156) at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1153) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1153) at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1145) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1914) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112) at org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.(MemoryBackendCheckpointStorage.java:87) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) .jar:/home/hadoop-2.9.0/share/hadoop/common/lib/jackson-jaxrs-1.9.13.jar:/home/hadoop-2.9.0/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/hadoop-2.9.0/share/hadoop/common/lib/guava-11.0.2.jar:/home/2019-07-22 05:39:02.960 [Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for task Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) (54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING] 2019-07-22 05:39:02.960 [Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) (54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING]. 2019-07-22 05:39:02.963 [Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2)] INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) (54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING]. 2019-07-22 05:39:02.963 [Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (1/2)] INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Map -> Filter -> Timestamps/Watermarks ->
Re: flink报空指针
hi 九思: 从提供的日志来看是看不出什么问题导致的空指针异常,有更详细的日志或者把你的代码提供出来吗?或者可以简单参考我自己之前写的博客(写入数据到 rabbitmq):http://1t.click/uh6 希望对你有帮助! 九思 <1048095...@qq.com> 于2019年7月22日周一 下午1:32写道: > 请教老师,flink on yarn,往rabbitmq写数据 或者 > 消费rabbitmq数据,都会报空指针,啥意思呢?本地idea都是正常的。flink包版本是1.8的,flink环境之前搭的1.7 > > 2019-07-22 11:32:12.309 [Source: Custom Source (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) > (85cfdb83f536b26e07ca2aa4a1b66302) switched from DEPLOYING > to FAILED. > java.lang.NullPointerException: null > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689) > at java.lang.Thread.run(Thread.java:745) > 2019-07-22 11:32:12.310 [Source: Custom Source (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Freeing task resources for > Source: Custom Source (1/1) (85cfdb83f536b26e07ca2aa4a1b663 > 02). > 2019-07-22 11:32:12.321 [Source: Custom Source (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem > streams are closed for task Source: Custom Source (1/1) (85cfd > b83f536b26e07ca2aa4a1b66302) [FAILED] > 2019-07-22 11:32:12.331 [flink-akka.actor.default-dispatcher-3] INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task > and sending final execution state FAILED to Job > Manager for task Source: Custom Source 85cfdb83f536b26e07ca2aa4a1b66302. > > > > >
flink????????
??flink on yarnrabbitmq?? rabbitmq??ideaflink1.8flink1.7 2019-07-22 11:32:12.309 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (85cfdb83f536b26e07ca2aa4a1b66302) switched from DEPLOYING to FAILED. java.lang.NullPointerException: null at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689) at java.lang.Thread.run(Thread.java:745) 2019-07-22 11:32:12.310 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1) (85cfdb83f536b26e07ca2aa4a1b663 02). 2019-07-22 11:32:12.321 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source (1/1) (85cfd b83f536b26e07ca2aa4a1b66302) [FAILED] 2019-07-22 11:32:12.331 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to Job Manager for task Source: Custom Source 85cfdb83f536b26e07ca2aa4a1b66302.
??????rocksDB??????????????????on-yarn??
?? rocksDBStateBackend.setDbStoragePaths(ROCKSDB_LOCAL_DIRS: _*) -- -- ??: "zq wang"; : 2019??7??22??(??) 11:21 ??: "user-zh"; : rocksDB??on-yarn?? ??rocksDBio.tmp.dirs??-yD io.tmp.dirs??on-yarnyarn-site LOCAL_DIRS??yarn-site.yml ?? flink-on-yarn ??rocksDB
rocksDB本地存储路径疑问【on-yarn】
各位大佬好, 我想把rocksDB本地存储改下存储位置,查到了需要修改io.tmp.dirs路径,实际添加参数-yD io.tmp.dirs并不能正常启动任务,去掉该参数可正常启动。后续查阅on-yarn模式应该遵从的是yarn-site LOCAL_DIRS的配置。请问在不改yarn-site.yml 下 怎么正确配置flink-on-yarn 下rocksDB的本地路径?
Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志
看到啦,谢谢啦。 在 2019-07-21 19:16:36,"Caizhi Weng" 写道: >Hi Henry, > >你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true... > >Henry 于2019年7月20日周六 下午9:32写道: > >> >> >> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把 running = false; 改成了 running = true; >> 感谢感谢! 但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它, >> 怎么会跳转这里呢? >> >> >> >> >> >> 在 2019-07-20 03:23:28,"Caizhi Weng" 写道: >> >Hi Henry, >> > >> >LOG.error(e.getLocalizedMessage()); >> >running = true; >> > >> >这里写错了吧,应该是 running = false; >> > >> >Henry 于2019年7月19日周五 下午4:04写道: >> > >> >> >> >> >> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source >> >> 代码,但是里面没有写log里报的哪个错的提示。 >> >> package com.JavaCustoms; >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> >> import org.apache.flink.configuration.Configuration; >> >> import >> org.apache.flink.streaming.api.functions.source.RichSourceFunction; >> >> import org.slf4j.Logger; >> >> import org.slf4j.LoggerFactory; >> >> >> >> import javax.jms.*; >> >> >> >> public class FlinkJMSStreamSource extends RichSourceFunction { >> >> private static final long serialVersionUID = 1L; >> >> private static final Logger LOG = >> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class); >> >> private transient volatile boolean running; >> >> private transient MessageConsumer consumer; >> >> private transient Connection connection; >> >> >> >> // topic name >> >> private static final String topicName = "flink_mypay"; >> >> // tcp str >> >> private static final String tcpStr = "tcp://server.mn:61616"; >> >> // 持久订阅的id标识 >> >> private static final String clientId = "flink_hz"; >> >> // Subscription name >> >> private static final String subscriptionName = "flink_topic_mypay"; >> >> >> >> private void init() throws JMSException { >> >> // Create a ConnectionFactory >> >> ActiveMQConnectionFactory connectionFactory = new >> >> ActiveMQConnectionFactory(tcpStr); >> >> >> >> // Create a Connection >> >> connection = connectionFactory.createConnection(); >> >> connection.setClientID(clientId); >> >> //connection.start(); >> >> >> >> // Create a Session >> >> Session session = connection.createSession(false, >> >> Session.AUTO_ACKNOWLEDGE); >> >> >> >> // Create a MessageConsumer from the Session to the Topic or Queue >> >> Topic topic = session.createTopic(topicName); >> >> consumer = session.createDurableSubscriber(topic, subscriptionName); >> >> connection.start(); >> >>} >> >> >> >> @Override >> >> public void open(Configuration parameters) throws Exception { >> >> super.open(parameters); >> >> running = true; >> >> init(); >> >>} >> >> >> >> @Override >> >> public void run(SourceContext ctx) { >> >> // this source never completes >> >> >> >> while (running) { >> >> try { >> >> Message message = consumer.receive(); >> >> BytesMessage bytesMessage = (BytesMessage) message; >> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; >> >> bytesMessage.readBytes(bytes); >> >> >> >> String text = new String(bytes); >> >> ctx.collect(text); >> >> >> >> } catch (JMSException e) { >> >> LOG.error(e.getLocalizedMessage()); >> >> running = true; >> >> } >> >> } >> >> try { >> >> close(); >> >> } catch (Exception e) { >> >> LOG.error(e.getMessage(), e); >> >> } >> >>} >> >> >> >> @Override >> >> public void cancel() { >> >> running = false; >> >>} >> >> >> >> @Override >> >> public void close() throws Exception { >> >> LOG.info("Closing"); >> >> try { >> >> connection.close(); >> >> } catch (JMSException e) { >> >> throw new RuntimeException("Error while closing ActiveMQ connection ", >> e); >> >> } >> >>} >> >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 14:43:17,"Caizhi Weng" 写道: >> >> >Hi Henry >> >> > >> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `> >> >level="OFF"> ` (logback) 把 log 关掉,或者把 >> >> log >> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了... >> >> > >> >> >Henry 于2019年7月19日周五 下午2:20写道: >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng" 写道: >> >> >> >Hi Henry, >> >> >> > >> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source >> >> >> >的源码让它出错后关闭或者进行其它处理... >> >> >> > >> >> >> >Henry 于2019年7月19日周五 上午9:31写道: >> >> >> > >> >> >> >> 大家好,之前那个报错图片大家没看到,重新弄一下。 >> >> >> >> 报错图片链接: >> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png >> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。 >> >> >> >> >> >> >> >> >> >>