?????? flink????????

2019-07-21 文章 ????
??TaskManager??
2019-07-22 05:39:03,987 WARN  org.apache.hadoop.ipc.Client  
- Failed to connect to server: master/10.0.2.11:9000: try once and 
fail. java.nio.channels.ClosedByInterruptException   at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) 
at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192) 
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)at 
org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685) at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)  at 
org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410) at 
org.apache.hadoop.ipc.Client.getConnection(Client.java:1550) at 
org.apache.hadoop.ipc.Client.call(Client.java:1381)  at 
org.apache.hadoop.ipc.Client.call(Client.java:1345)  at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
   at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
   at com.sun.proxy.$Proxy17.mkdirs(Unknown Source)at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:583)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
  at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
   at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)at 
org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2540) at 
org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2515) at 
org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1156)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1153)
  at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1153)
 at 
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1145)
 at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1914)
 at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
  at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
  at 
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.(MemoryBackendCheckpointStorage.java:87)
  at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) 
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at 
java.lang.Thread.run(Thread.java:745) 
.jar:/home/hadoop-2.9.0/share/hadoop/common/lib/jackson-jaxrs-1.9.13.jar:/home/hadoop-2.9.0/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/hadoop-2.9.0/share/hadoop/common/lib/guava-11.0.2.jar:/home/2019-07-22
 05:39:02.960 [Map -> Filter -> Timestamps/Watermarks -> Filter -> Sink: 
Unnamed (2/2)] INFO  org.apache.flink.runtime.taskmanager.Task  - Creating 
FileSystem stream leak safety net for task Map -> Filter -> 
Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) 
(54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING] 2019-07-22 05:39:02.960 [Map -> 
Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Loading JAR files for task Map -> 
Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) 
(54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING]. 2019-07-22 05:39:02.963 [Map -> 
Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Map 
-> Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (2/2) 
(54be512ff5fc97d934ca9fbb96d66fe6) [DEPLOYING]. 2019-07-22 05:39:02.963 [Map -> 
Filter -> Timestamps/Watermarks -> Filter -> Sink: Unnamed (1/2)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Registering task at network: Map 
-> Filter -> Timestamps/Watermarks -> 

Re: flink报空指针

2019-07-21 文章 zhisheng
hi 九思:
从提供的日志来看是看不出什么问题导致的空指针异常,有更详细的日志或者把你的代码提供出来吗?或者可以简单参考我自己之前写的博客(写入数据到
rabbitmq):http://1t.click/uh6
希望对你有帮助!

九思 <1048095...@qq.com> 于2019年7月22日周一 下午1:32写道:

>  请教老师,flink on yarn,往rabbitmq写数据 或者
> 消费rabbitmq数据,都会报空指针,啥意思呢?本地idea都是正常的。flink包版本是1.8的,flink环境之前搭的1.7
>
> 2019-07-22 11:32:12.309 [Source: Custom Source (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source (1/1)
> (85cfdb83f536b26e07ca2aa4a1b66302) switched from DEPLOYING
>  to FAILED.
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
> at java.lang.Thread.run(Thread.java:745)
> 2019-07-22 11:32:12.310 [Source: Custom Source (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
> Source: Custom Source (1/1) (85cfdb83f536b26e07ca2aa4a1b663
> 02).
> 2019-07-22 11:32:12.321 [Source: Custom Source (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
> streams are closed for task Source: Custom Source (1/1) (85cfd
> b83f536b26e07ca2aa4a1b66302) [FAILED]
> 2019-07-22 11:32:12.331 [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
> and sending final execution state FAILED to Job
> Manager for task Source: Custom Source 85cfdb83f536b26e07ca2aa4a1b66302.
>
>
>
>
>


flink????????

2019-07-21 文章 ????
??flink on yarnrabbitmq??  
rabbitmq??ideaflink1.8flink1.7


2019-07-22 11:32:12.309 [Source: Custom Source (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source (1/1) 
(85cfdb83f536b26e07ca2aa4a1b66302) switched from DEPLOYING
 to FAILED.
java.lang.NullPointerException: null
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1175)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Thread.java:745)
2019-07-22 11:32:12.310 [Source: Custom Source (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for Source: 
Custom Source (1/1) (85cfdb83f536b26e07ca2aa4a1b663
02).
2019-07-22 11:32:12.321 [Source: Custom Source (1/1)] INFO  
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams 
are closed for task Source: Custom Source (1/1) (85cfd
b83f536b26e07ca2aa4a1b66302) [FAILED]
2019-07-22 11:32:12.331 [flink-akka.actor.default-dispatcher-3] INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task and 
sending final execution state FAILED to Job
Manager for task Source: Custom Source 85cfdb83f536b26e07ca2aa4a1b66302.

??????rocksDB??????????????????on-yarn??

2019-07-21 文章 ?7?6
??
rocksDBStateBackend.setDbStoragePaths(ROCKSDB_LOCAL_DIRS: _*)





--  --
??: "zq wang";
: 2019??7??22??(??) 11:21
??: "user-zh";

: rocksDB??on-yarn??




??rocksDBio.tmp.dirs??-yD
io.tmp.dirs??on-yarnyarn-site
LOCAL_DIRS??yarn-site.yml
?? flink-on-yarn ??rocksDB

rocksDB本地存储路径疑问【on-yarn】

2019-07-21 文章 zq wang
各位大佬好,
我想把rocksDB本地存储改下存储位置,查到了需要修改io.tmp.dirs路径,实际添加参数-yD
io.tmp.dirs并不能正常启动任务,去掉该参数可正常启动。后续查阅on-yarn模式应该遵从的是yarn-site
LOCAL_DIRS的配置。请问在不改yarn-site.yml
下 怎么正确配置flink-on-yarn 下rocksDB的本地路径?


Re:Re: Re: Re: Re: Flink 的 log 文件夹下产生了 44G 日志

2019-07-21 文章 Henry



看到啦,谢谢啦。





在 2019-07-21 19:16:36,"Caizhi Weng"  写道:
>Hi Henry,
>
>你可能看错了,仔细看你的 run 函数,里面有个 try catch 里有 running = true...
>
>Henry  于2019年7月20日周六 下午9:32写道:
>
>>
>>
>> 啊!我想起来了,之前忘了因为啥原因了,为了方便调试把  running = false; 改成了 running = true;
>> 感谢感谢!  但是原因是为啥呢?这个 running = true; 是写在 cancel 中的,任务在执行没有取消它,
>> 怎么会跳转这里呢?
>>
>>
>>
>>
>>
>> 在 2019-07-20 03:23:28,"Caizhi Weng"  写道:
>> >Hi Henry,
>> >
>> >LOG.error(e.getLocalizedMessage());
>> >running = true;
>> >
>> >这里写错了吧,应该是 running = false;
>> >
>> >Henry  于2019年7月19日周五 下午4:04写道:
>> >
>> >>
>> >>
>> >> 谢谢你的帮助哈! 我也是觉得 source 里的问题,但是呢,木有找到错误的地方。下面这个是我那个自定义的 source
>> >> 代码,但是里面没有写log里报的哪个错的提示。
>> >> package com.JavaCustoms;
>> >> import org.apache.activemq.ActiveMQConnectionFactory;
>> >> import org.apache.flink.configuration.Configuration;
>> >> import
>> org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> >> import org.slf4j.Logger;
>> >> import org.slf4j.LoggerFactory;
>> >>
>> >> import javax.jms.*;
>> >>
>> >> public class FlinkJMSStreamSource extends RichSourceFunction {
>> >> private static final long serialVersionUID = 1L;
>> >> private static final Logger LOG =
>> >> LoggerFactory.getLogger(FlinkJMSStreamSource.class);
>> >> private transient volatile boolean running;
>> >> private transient MessageConsumer consumer;
>> >> private transient Connection connection;
>> >>
>> >> // topic name
>> >> private static final String topicName = "flink_mypay";
>> >> // tcp str
>> >> private static final String tcpStr = "tcp://server.mn:61616";
>> >> // 持久订阅的id标识
>> >> private static final String clientId = "flink_hz";
>> >> // Subscription name
>> >> private static final String subscriptionName = "flink_topic_mypay";
>> >>
>> >> private void init() throws JMSException {
>> >> // Create a ConnectionFactory
>> >> ActiveMQConnectionFactory connectionFactory = new
>> >> ActiveMQConnectionFactory(tcpStr);
>> >>
>> >> // Create a Connection
>> >> connection = connectionFactory.createConnection();
>> >> connection.setClientID(clientId);
>> >> //connection.start();
>> >>
>> >>   // Create a Session
>> >> Session session = connection.createSession(false,
>> >> Session.AUTO_ACKNOWLEDGE);
>> >>
>> >> // Create a MessageConsumer from the Session to the Topic or Queue
>> >> Topic topic = session.createTopic(topicName);
>> >> consumer = session.createDurableSubscriber(topic, subscriptionName);
>> >> connection.start();
>> >>}
>> >>
>> >> @Override
>> >> public void open(Configuration parameters) throws Exception {
>> >> super.open(parameters);
>> >> running = true;
>> >>   init();
>> >>}
>> >>
>> >> @Override
>> >> public void run(SourceContext ctx) {
>> >> // this source never completes
>> >>
>> >> while (running) {
>> >> try {
>> >> Message message = consumer.receive();
>> >> BytesMessage bytesMessage = (BytesMessage) message;
>> >> byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
>> >> bytesMessage.readBytes(bytes);
>> >>
>> >> String text = new String(bytes);
>> >> ctx.collect(text);
>> >>
>> >>  } catch (JMSException e) {
>> >> LOG.error(e.getLocalizedMessage());
>> >> running = true;
>> >>  }
>> >>   }
>> >> try {
>> >>  close();
>> >>   } catch (Exception e) {
>> >> LOG.error(e.getMessage(), e);
>> >>   }
>> >>}
>> >>
>> >> @Override
>> >> public void cancel() {
>> >> running = false;
>> >>}
>> >>
>> >> @Override
>> >> public void close() throws Exception {
>> >> LOG.info("Closing");
>> >> try {
>> >> connection.close();
>> >>   } catch (JMSException e) {
>> >> throw new RuntimeException("Error while closing ActiveMQ connection ",
>> e);
>> >>   }
>> >>}
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2019-07-19 14:43:17,"Caizhi Weng"  写道:
>> >> >Hi Henry
>> >> >
>> >> >你的意思是不想让 Flink 写 log 吗?那只能通过 `log4j.rootLogger=OFF` (log4j) 或者 `> >> >level="OFF">  ` (logback) 把 log 关掉,或者把
>> >> log
>> >> >等级设成更高的 FATAL... 但我感觉问题还是自定义的 source 里写 log 的时候死循环了...
>> >> >
>> >> >Henry  于2019年7月19日周五 下午2:20写道:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2019-07-19 11:11:37,"Caizhi Weng"  写道:
>> >> >> >Hi Henry,
>> >> >> >
>> >> >> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
>> >> >> >的源码让它出错后关闭或者进行其它处理...
>> >> >> >
>> >> >> >Henry  于2019年7月19日周五 上午9:31写道:
>> >> >> >
>> >> >> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
>> >> >> >> 报错图片链接:
>> >> >> >> https://img-blog.csdnimg.cn/20190719092540880.png
>> >> >> >> https://img-blog.csdnimg.cn/20190719092848500.png
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
>> >> >> >>
>> >> >>
>> >>
>>