Re: flink dataStream多次sink DAG重复驱动执行?

2021-03-07 文章 Evan
个人理解是不会重复驱动执行的,具体你可以测试一下,但是从底层原理上讲,我也讲不了。
 
发件人: lp
发送时间: 2021-03-05 17:31
收件人: user-zh
主题: flink dataStream多次sink DAG重复驱动执行?
 
有个疑问,
如下程序片段:
 
--
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);
properties.setProperty("group.id",kafkaOdsGroup);
properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset);
   
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,kafkaOdsPartitionDiscoverInterval);
   
properties.setProperty("transaction.timeout.ms",KafkaOdsTransactionTimeout);//kafka事务超时时间
 
FlinkKafkaConsumer flinkKafkaConsumer = new
FlinkKafkaConsumer<>(kafkaOdsTopic,new SimpleStringSchema(),properties);
DataStreamSource dataStreamSource =
env.addSource(flinkKafkaConsumer);
dataStreamSource.printToErr("1");
dataStreamSource.printToErr("2");
dataStreamSource.printToErr("3");

 
 
我对一个datastream进行多次相同操作的sink,请问是否会导致上游整个DAG重复驱动执行,基于spark的惯性思维,我认为上游DAG是会重复驱动执行的?
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: 退订

2021-03-07 文章 Evan
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1



 
发件人: zenglong chen
发送时间: 2021-03-08 10:00
收件人: user-zh
主题: 退订
退订 


Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan
好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取



 
发件人: Paul Lam
发送时间: 2021-02-24 17:03
收件人: user-zh
主题: Re: 社区有人实现过Flink的MongodbSource吗?
Hi,
 
Debezium 支持 MongoDB CDC[1],可以了解下。
 
[1] https://debezium.io/documentation/reference/connectors/mongodb.html
 
Best,
Paul Lam
 
> 2021年2月24日 16:23,Evan  写道:
> 
> 
> 有人完整的实现Flink的MongodbSource吗
> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> 
> 
 


社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan

有人完整的实现Flink的MongodbSource吗
如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化




????: Pyflink????kerberos??????Kafka??????????

2021-01-30 文章 Evan
??  ??


 ??
?? 2021-01-30 17:53
 user-zh
?? Pyflinkkerberos??Kafka??
??
??pyflinkkafka??

flink-conf.yamlkerberos 


jaas.conf??

??


Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 Evan
你好,可以获取
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

可以查阅官网得到你想要的信息:  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#available-metadata
 
希望能帮助到你。



 
发件人: gimlee
发送时间: 2021-01-21 11:20
收件人: user-zh
主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
如题,需要获取到kafka的partition、offset进行处理
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


????: flink sql hop????????udaf????????????merge??????????????????????

2021-01-17 文章 Evan
??

??


 
 bigdata
?? 2021-01-18 14:52
 user-zh
?? flink sql hopudafmerge??
??
flink1.10.1 
sql??hop??udaf??merge??merge??
org.apache.flink.table.planner.codegen.CodeGenException: No matching merge 
method found for AggregateFunction com.autoai.cns.udaf.PercentileUDAF'
merge ??
1??ImperativeAggCodeGencheckNeededMethods??if (needMerge)   
getUserDefinedMethod 
2??UserDefinedFunctionUtilsgetUserDefinedMethod??merge??mergefalse,??flink??bug
parameterClassEquals(methodSignature(i), clazz) ||
parameterDataTypeEquals(internalTypes(i), dataTypes(i))


??
def merge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
  its.foreach(i => accumulator ++ i)
}


????: flink sql hop????udaf????

2021-01-17 文章 Evan
merge??marge


Evan Cheng
2021??1??18??09:00:07



 
 bigdata
?? 2021-01-17 22:31
 user-zh
?? flink sql hopudaf
??
    flink1.10.1sql 
hop??udafmarge??
org.apache.flink.table.api.ValidationException: Function class 
'com.autoai.cns.udaf.PercentileUDAF' does not implement at least one method 
named 'merge' which is public, not abstract and (in case of table functions) 
not static
UDAF??
class PercentileUDAF extends AggregateFunction[String, ListBuffer[Float]]{
  //
  val percentile1 = 0.5
  val percentile2 = 0.75
  val percentile3 = 0.98
  val percentile4 = 0.99
  /**
* 
* @param accumulator ??
* @return ??
*/
  override def getValue(accumulator: ListBuffer[Float]): String = {
//
val length = accumulator.size
var i1 = Math.round(length*percentile1).toInt
if(i1==0) i1 = 1
var i2 = Math.round(length*percentile2).toInt
if(i2==0) i2 = 1
var i3 = Math.round(length*percentile3).toInt
if(i3==0) i3 = 1
var i4 = Math.round(length*percentile4).toInt
if(i4==0) i4 = 1
val seq = accumulator.sorted
//
seq(i1-1).toInt+","+seq(i2-1).toInt+","+seq(i3-1).toInt+","+seq(i4-1).toInt
  }
  override def createAccumulator(): ListBuffer[Float] = new ListBuffer[Float]()
  def accumulate(accumulator: ListBuffer[Float], i: Float): Unit = {
accumulator.append(i)
  }
  def marge(accumulator: ListBuffer[Float], its: Iterable[ListBuffer[Float]]): 
Unit = {
its.foreach(i => accumulator ++ i)
  }


Re: 回复: flink sql读kafka元数据问题

2021-01-15 文章 Evan
我知道 酷酷同学 啥意思,kafka 消息是key,value形式,当然这个key默认是null,他想在 select 语句里将 
kafka的key值读出来对吧。

我也在文档里找了,确实是没有相关文档说明



 
发件人: 酷酷的浑蛋
发送时间: 2021-01-15 16:35
收件人: user-zh@flink.apache.org
主题: 回复: flink sql读kafka元数据问题
直接读topic,headers是空,我仅仅是想读key,不管topic是谁写入的
 
 
 
 
在2021年01月14日 16:03,酷酷的浑蛋 写道:
你意思是说,topic不是flink写入的,用flink sql就不能读到key?
 
 
 
 
在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi
 
你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句



 
发件人: datayangl
发送时间: 2021-01-14 16:13
收件人: user-zh
主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
FlinkUtils.initTable()
val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
streamEnv.disableOperatorChaining()
streamEnv.setParallelism(1)
streamEnv.setMaxParallelism(1)
CheckPointUtils.setCheckPoint(streamEnv, 12, 6)
dealWithOdsDataTohive(tableEnv)
val sqls:Map[String,String] = ConfigItem.ODS_SQL
 
val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet
 
val filledAllSqlsTable = sqls.map(x=>{
  val hiveMapTopic = hiveTableMapTopic
  val topicName = hiveMapTopic.getOrElse(x._1,null)
  val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
  (x._1,topic,x._2)
}).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
  val sql = fillTemplate(x._1,x._2,x._3)
  tableEnv.executeSql(sql)
  x._1
})
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
val stmtSet = tableEnv.createStatementSet()
val allInsertSqls = filledAllSqlsTable.map(table=>{
  s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
}).toList
allInsertSqls.foreach(x=>{
  stmtSet.addInsertSql(x)
})
val insertTaskStatus = stmtSet.execute()
//insertTaskStatus.print()
println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"
 
filled
  }
 
执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)
 
报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。
 
 
参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
是的,应该是机制问题,链接[1]打开有这样一句解释:

 If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 

如回答有误,请指正。





 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);



yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
 
 
yinghua...@163.com


回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
代码图挂掉了,看不到代码



 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


yinghua...@163.com


Re: Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
你可以从akka的原理架构中获取一些信息
参考: https://akka.io 

之前在其他博主的介绍中看到过,感觉意思差不多,上面说这是JobManager和TaskManager之间通信时发送的消息大小的最大值



 
发件人: 赵一旦
发送时间: 2021-01-14 14:19
收件人: user-zh
主题: Re: Flink webui 查询任务信息报错500
好的,我找到了这个参数。不过这个参数表达啥含义知道吗,我看10MB不是个小数字感觉。
 
Evan  于2021年1月14日周四 下午1:54写道:
 
> 有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上:
>
> akka.framesize
> "10485760b"StringMaximum size of messages which are sent between the
> JobManager and the TaskManagers. If Flink fails because messages exceed
> this limit, then you should increase it. The message size requires a
> size-unit specifier.
>
> 参考:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
>
>
>
>
>
> 发件人: 赵一旦
> 发送时间: 2021-01-14 11:38
> 收件人: user-zh
> 主题: Flink webui 查询任务信息报错500
> 报错500,开发者工具展示的异常信息如下。
>  side:↵org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException:
> The method requestJob's result size 19811407 exceeds the maximum size
> 10485760 .↵ at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:363)↵
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:337)↵
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵
> at
>
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)↵
> at
>
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)↵
> at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)↵
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)↵
> at
>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)↵
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)↵
> at akka.dispatch.OnComplete.internal(Future.scala:264)↵ at
> akka.dispatch.OnComplete.internal(Future.scala:261)↵ at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)↵ at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)↵ at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at
>
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)↵
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)↵
> at
>
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)↵
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)↵ at
>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)↵
> at
>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)↵
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)↵ at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)↵ at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)↵ at
>
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)↵
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)↵
> at
>
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)↵
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)↵ at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)↵
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)↵ at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)↵
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)↵
> at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)↵↵End
> of exception on server side>"
>
> 想知道这个是什么情况,以及需要调整什么参数呢?
>


Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
有这样一个参数“akka.framesize” ,可以在你启动flink的时候加上 或者 在conf/flink-conf.yaml 配置上:

akka.framesize
"10485760b"StringMaximum size of messages which are sent between the JobManager 
and the TaskManagers. If Flink fails because messages exceed this limit, then 
you should increase it. The message size requires a size-unit specifier.

参考:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
 




 
发件人: 赵一旦
发送时间: 2021-01-14 11:38
收件人: user-zh
主题: Flink webui 查询任务信息报错500
报错500,开发者工具展示的异常信息如下。
"
 
想知道这个是什么情况,以及需要调整什么参数呢?


Re: Flink webui 查询任务信息报错500

2021-01-13 文章 Evan
这是flink的Akka部分报的错,相关源码如下,可以找找这个 maximumFramesize 怎么修改?

https://github.com/apache/flink/blob/d093611b5dfab95fe62e4f861879762ca2e43437/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java:
 


private Either, AkkaRpcException> 
serializeRemoteResultAndVerifySize(
Object result, String methodName) {
try {
SerializedValue serializedResult = new SerializedValue<>(result);

long resultSize =
serializedResult.getByteArray() == null
? 0
: serializedResult.getByteArray().length;
if (resultSize > maximumFramesize) {
return Either.Right(
new AkkaRpcException(
"The method "
+ methodName
+ "'s result size "
+ resultSize
+ " exceeds the maximum size "
+ maximumFramesize
+ " ."));
} else {
return Either.Left(serializedResult);
}
} catch (IOException e) {
return Either.Right(
new AkkaRpcException(
"Failed to serialize the result for RPC call : " + 
methodName + '.',
e));
}
}


 
发件人: 赵一旦
发送时间: 2021-01-14 11:38
收件人: user-zh
主题: Flink webui 查询任务信息报错500
报错500,开发者工具展示的异常信息如下。
"
 
想知道这个是什么情况,以及需要调整什么参数呢?


Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 文章 Evan
你好,在数据库中,Float类型存的是个近似值,不能用类似于 = 或者 != 的比较语句,所以也不支持IN操作
希望能帮助到你 



 
From: jy l
Date: 2021-01-12 18:04
To: user-zh
Subject: FlinkSQL Filter Error With Float Column on flink-1.12.0
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(
  (1.0f, 11.0f, 12.0f),
  (2.0f, 21.0f, 22.0f),
  (3.0f, 31.0f, 32.0f),
  (4.0f, 41.0f, 42.0f),
  (5.0f, 51.0f, 52.0f)
)
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
val query =
  """
|select * from myTable where id in (1.0, 2.0, 3.0)
|""".stripMargin
tEnv.executeSql(query).print()
}
 
exception:
Exception in thread "main" java.lang.UnsupportedOperationException: class
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at
org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)
 
Why is that? How do i need to solve it?
 
thanks.


回复: flink sql消费kafka sink到mysql问题

2021-01-05 文章 Evan
flinksql 貌似是目前做不到你说的这样



 
发件人: air23
发送时间: 2021-01-06 12:29
收件人: user-zh
主题: flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
 
 
CREATE TABLE source1 (
id BIGINT   ,
username STRING ,
password STRING  ,
AddTime TIMESTAMP  ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'canal1',
'scan.startup.mode' = 'group-offsets',
'canal-json.table.include' = 'test.*',
-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 
false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
'format' = 'canal-json'
);


????: flinksql1.11 ????phoenix????????Caused by: org.apache.calcite.avatica.NoSuchStatementException

2021-01-05 文章 Evan
??bug



 
 
?? 2021-01-05 20:20
 user-zh
?? flinksql1.11 phoenixCaused by: 
org.apache.calcite.avatica.NoSuchStatementException
??
flinkv1.11phoneix 1.14.1

CREATE TABLE pe_login_kafka (
 id INT,
region_id INT,
 ts TIMESTAMP(3),
 proc_time AS PROCTIME()
) WITH (
 ??connector?? = ??kafka??,
 ??topic?? = ??t-region,
 ??properties.bootstrap.servers?? = ????,
 ??properties.group.id?? = gid??);
CREATE TABLE region_dim(
  id INT,
  region_name STRING
 ) WITH (
 ??connector?? = ??jdbc??,
  ??url?? = 
??jdbc:phoenix:thin:url=http://172.168.1.15:8765;serialization=PROTOBUF??,
  ??table-name?? = ph_region??,
  ??lookup.cache.max-rows?? = ??5000??,
  ??lookup.cache.ttl?? = ??600s??,
  ??lookup.max-retries?? = ??3??);
 
--sink
INSERT INTO 
FROM pe_login_kafka  k LEFT JOIN region_dim  FOR SYSTEM_TIME AS OF 
k.proc_time AS u
ON k.region_id = u.id;
??,
Caused by: java.sql.SQLException
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 ~[flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]
... 18 more
 
 
 
Caused by: org.apache.calcite.avatica.NoSuchStatementException
at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:349) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.remote.RemoteMeta$15.call(RemoteMeta.java:343) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.invokeWithRetries(AvaticaConnection.java:793)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.remote.RemoteMeta.execute(RemoteMeta.java:342) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:548)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
... 20 more
2021-01-05 19:40:07,469 ERROR 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction [] - JDBC 
executeBatch error, retry times = 2
java.sql.SQLException: null
at org.apache.calcite.avatica.Helper.createException(Helper.java:56) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.apache.calcite.avatica.Helper.createException(Helper.java:41) 
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaConnection.executeQueryInternal(AvaticaConnection.java:557)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.calcite.avatica.AvaticaPreparedStatement.executeQuery(AvaticaPreparedStatement.java:137)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:152)
 [flink-connector-jdbc_2.11-1.11.1.jar:1.11.1]
 
 
 
??jdbc??org.apache.calcite.avatica.AvaticaConnection??connection??Statement
??
 
 



回复: 邮件退订

2020-12-15 文章 Evan
你好,退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考 [1] 
https://flink.apache.org/community.html#mailing-lists



 
发件人: 谢治平
发送时间: 2020-12-16 09:08
收件人: user-zh
主题: 邮件退订
您好,邮件退订一下 


Re: flink无法写入数据到ES中

2020-12-13 文章 Evan
你的SQL语句语法有误,请参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html
 

希望能帮助到你!

 
发件人: 小墨鱼
发送时间: 2020-12-11 14:46
收件人: user-zh
主题: flink无法写入数据到ES中
我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" +"uid INT,\n" +   
"appid INT,\n" +"prepage_id INT,\n" +   
"page_id INT,\n" +"action_id STRING,\n" +   
"page_name STRING,\n" +"action_name STRING,\n" +   
"prepage_name STRING,\n" +"stat_time BIGINT,\n" +   
"dt DATE,\n" +"PRIMARY KEY (uid) NOT ENFORCED\n" +  
 
") WITH (\n" +"'connector.type' = 'elasticsearch',\n" + 
  
"'connector.version' = '6',\n" +"'connector.hosts' =
'http://localhost:9200',\n" +"'connector.index' =
'mytest',\n" +"'connector.document-type' = 'user_action',\n"
+"'update-mode' = 'append',\n" +   
"'connector.key-null-literal' = 'n/a',\n" +   
"'connector.bulk-flush.max-actions' = '1',\n" +   
"'format.type' = 'json'\n" +")";并通过下面查询出数据String sql =
"select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as
action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as
bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit
1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 关于cluster.evenly-spread-out-slots参数的底层原理

2020-11-06 文章 Evan



 
发件人: Shawn Huang
发送时间: 2020-11-06 16:56
收件人: user-zh
主题: Re: 关于cluster.evenly-spread-out-slots参数的底层原理
我说一下我看源码(1.11.2)之后的理解吧,不一定准确,仅供参考。
 
cluster.evenly-spread-out-slots 这个参数设置后会作用在两个地方:
1. JobMaster 的 Scheduler 组件
2. ResourceManager 的 SlotManager 组件
 
对于 JobMaster 中的 Scheduler,
它在给 execution vertex 分配 slot 是按拓扑排序的顺序依次进行的。
Scheduler 策略是会倾向于把 execution vertex 分配到它的上游节点所分配到的slot上,
因此在给某个具体 execution vertex 分配 slot 时都会计算出一个当前节点倾向于选择的TaskManager集合,
然后在可选的 slot 候选集中会根据三个维度来为某个slot打分,分别是:
1. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 ResourceID
是相同的(对于standalone模式可以不考虑该维度)
2. 候选slot所在的 TaskManager 与倾向于选择的 TaskManager 集合中有多少个的 全限定域名 是相同的
3. 候选slot所在的 TaskManager 目前的资源占用率
只有配置了 cluster.evenly-spread-out-slots 后,才会考虑第三个维度,否则仅会用前面两个维度进行打分。
打分之后会选择得分最高的 slot 分配给当前的 exection vertex。
需要注意的是这里的资源利用率只是根据某个 slot 所在的 TaskManager 中剩下多少个能够分配该 execution vertex 的
slot 计算出的,
(因为 Flink 要求同一 job vertex 的并行任务不能分配到同一 slot 中),能分配的越多,资源利用率越小,否则利用率越大。
而不是指实际的CPU内存等资源利用率。
 
对于 ResourceManager 中的 SlotManager 组件(这里说的都是 Standalone 模式下的
ResourceManager),
由于 JobMaster 的 slot 都是要向 resource manager 申请的。如果 JobMaster 需要新的 slot 了,会向
ResourceManager 的 SlotManager 组件申请。
如果没有配置 cluster.evenly-spread-out-slots 的话,SlotManager 从可用 slot 中随机返回一个。
如果配置了 cluster.evenly-spread-out-slots,SlotManager 会返回资源利用率最小的一个 slot。
这里的资源利用率计算方式是:看某个 slot 所在的 TaskManager 中有多少 slot 还没有被分配,空闲的越多,利用率越小,否则越大。
 
最后,你提问中说的均衡我没有太理解。某个算子的并发子任务是不会被分配到同一个slot中的,
但如果想把这些子任务均匀分配到不同机器上,这个当前的调度算法应该是无法保证的。
 
Best,
Shawn Huang
 
 
赵一旦  于2020年11月5日周四 下午10:18写道:
 
> 有没有人对cluster.evenly-spread-out-slots参数了解比较深入的给讲解下。
>
> 我主要想知道,设置这个参数为true之后。Flink是以一个什么样的规则去尽可能均衡分配的。
> standalone集群模式下,每个机器性能相同,flink slot数量配置相同情况下。基于*这种分配规则*,有没有一种方法让Flink做到
> *完全均衡*,而*不是尽可能均衡*?
>
> 此外,我说的“均衡”都特指算子级别的均衡。不要5机器一共5个slot,然后任务有5个算子,每个算子单并发并且通过不同的share
> group各独占1个slot这种均衡。我指的是每个算子都均衡到机器(*假设并发设置合理*)。
>


????: ?????? pyflink??where??????????????????????

2020-11-01 文章 Evan
?? ?? 
where??where??pyflink??api??where??



 
 
?? 2020-11-02 10:15
 user-zh
?? ?? pyflink??where??
??
??where
??
---
Py4JJavaError                  
           Traceback (most recent call last)


?????? flink1.11????????

2020-07-20 文章 Evan
Hi,??jar??1??45jar




--  --
??: 
   "user-zh"



FlinkSQL 任务提交后 任务名称问题

2020-07-18 文章 Evan
代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into 
esSinkTable select ... from kafkaSourceTable")执行
任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”


这样很不友好啊,能不能我自己指定任务名称呢?

回复:ddl es 报错

2020-07-09 文章 Evan
Hello,


这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作
真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。
而tableEnv.toRetractStream(table, Row.class).print(); 
这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


2020年7月9日15:31:56 




-- 原始邮件 --
发件人: "出发"<573693...@qq.com>;
发送时间: 2020年3月23日(星期一) 晚上11:30
收件人: "user-zh"http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

代码中如何取消正在运行的Flink Streaming作业

2020-07-08 文章 Evan
这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming API有没有提供类似的接口,调用后就能停止这个Stream作业呢?

回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 文章 Evan
之前的代码好像乱码了,我设置了一下,重新发一下,建议你 
在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
 Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6



 /**
     * @param env
     * @param topic
     * @param time  订阅的时间
     * @return
     * @throws IllegalAccessException
     */
    public static DataStreamSource

回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录

2020-04-07 文章 Evan
   苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码 
“consumer.setStartFromLatest();”然后再测试一下。


/**
     * @param env
     * @param topic
     * @param time  订阅的时间
     * @return
     * @throws IllegalAccessException
     */
    public static DataStreamSource

??????fink??????????????kafka????????????????????

2020-04-06 文章 Evan
??kafkaoffset






--  --
??: ""

?????? Re: flink????kafka????????????kafka??????????????????

2020-01-12 文章 Evan
??kafka??Offset??
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 
--group ${group.id} --topic ${topic_name}
zkhost ??group.id??topic_name

Group           Topic        
                  Pid Offset  
        logSize         Lag  
           Owner
test            dy_event      
                 0  
 8115733         10658588      
  2542855         none
test            dy_event      
                 1  
 8114221         10658585      
  2544364         none
test            dy_event      
                 2  
 8115173         10658587      
  2543414         none
test            dy_event      
                 3  
 8115127         10658585      
  2543458         none
test            dy_event      
                 4  
 8115160         10658587      
  2543427         none



pid Offset??




--  --
??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
> >
> >wqpapa 

??????flink????Kafka????????????

2020-01-09 文章 Evan
 kafka??advertised.host.name??advertised.port
 

 host??
 Kafka 
 Kafka 
?? 0.8.2.x?? 
0.10.x
 Kafka ??
??,?? hosts 
?? zookeeper ?? 
hostnamehostname1:2181,hostname2:2181,hostname3:2181

??kafka??
??


--  --
??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??FlinkKafka??kafka
 console consumer??
flinkjob??

??????flink????Kafka????????????

2020-01-09 文章 Evan
kafkaadvertised.host.name




--  --
??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??FlinkKafka??kafka
 console consumer??
flinkjob??

Flink????????????

2019-09-09 文章 Evan
??flink1.7.1
??centos 7
job??start-cluster.sh
test04


??flink job??


$ bin/flink run -m test04:8081 -c 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass 
flink-scala-project1.jar
Starting execution of program


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: f9ac0c76e0e44cac6d6c3b1c41afa161)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass$.main(FlinkConsumerKafkaSinkToKuduMainClass.scala:16)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass.main(FlinkConsumerKafkaSinkToKuduMainClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:216)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:515)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHan

Flink????????????

2019-09-09 文章 Evan
??flink1.7.1
??centos 7
job??start-cluster.sh
test04


??flink job??


$ bin/flink run -m test04:8081 -c 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass 
flink-scala-project1.jar
Starting execution of program


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: f9ac0c76e0e44cac6d6c3b1c41afa161)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass$.main(FlinkConsumerKafkaSinkToKuduMainClass.scala:16)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass.main(FlinkConsumerKafkaSinkToKuduMainClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:216)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:515)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHan