flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-14 Thread 花乞丐
我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的?

 
我代码也指定了watermark,但是debug的时候好像没有起作用



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

2021-01-14 Thread Salva Alcántara
Can anyone explain why I am getting this error?

"Exception in thread "main" java.lang.IllegalStateException: No
ExecutorFactory found to execute the application."

I have tried a slightly different approach by running the jar that `sbt
assembly`produces inside a container that looks like this (Dockerfile):

```
FROM flink:1.11.2-scala_2.12-java11
COPY ./path/to/my.jar my.jar
```

I have tried with different versions of flink (1.10.2 for example) and I am
getting the same error. This should be pretty straightforward but I cannot
make it work. My main looks like this

```
object MyApp extends App {
  ...
  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  ...
  env.execute
}
```

and it fails when it reaches the call to `getExecutionEnvironment`...




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re:Re:flinksql 消费kafka offset问题

2021-01-14 Thread Michael Ran
额,不用checkpoint 会比较麻烦。 以前自定义sink 的时候,会把消息信息到sink 位置进行提交。 上游source 
也得改造,拉取位置也得统一,比如走redis 数据库等等
在 2021-01-15 12:41:25,"air23"  写道:
>我的意思 是不使用checkpoint。
>使用'scan.startup.mode' = 'group-offsets' 去维护offset
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-01-15 11:35:16,"Michael Ran"  写道:
>>下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 
>>是异步操作,告诉上游你sink 完成了,实际你sink失败了
>>在 2021-01-15 10:29:15,"air23"  写道:
>>>flink消费kafka 只能使用checkpoint去维护offset吗
>>>
>>>我这边使用'scan.startup.mode' = 'group-offsets'
>>>
>>>如果中间报错了 或者停止任务,但是我下游sink还没有完成,
>>>下次启动直接跳过这个报错的数据,会丢数据,谢谢回复


Re: Deterministic rescale for test

2021-01-14 Thread Martin Frank Hansen
Hi Jaffe,

Thanks for your reply, I will try to use a Custom Partioner.

Den tor. 14. jan. 2021 kl. 19.39 skrev Jaffe, Julian <
julianja...@activision.com>:

> Martin,
>
>
>
> You can use `.partitionCustom` and provide a partitioner if you want to
> control explicitly how elements are distributed to downstream tasks.
>
>
>
> *From: *Martin Frank Hansen 
> *Reply-To: *"m...@berlingskemedia.dk" 
> *Date: *Thursday, January 14, 2021 at 1:48 AM
> *To: *user 
> *Subject: *Deterministic rescale for test
>
>
>
> Hi,
>
> I am trying to make a test-suite for our Flink jobs, and are having
> problems making the input-data deterministic.
>
> We are reading a file-input with parallelism 1 and want to rescale to a
> higher parallelism, such that the ordering of the data is the same every
> time.
>
> I have tried using rebalance, rescale but it seems to randomly distribute
> data between partitions. We don't need something optimized, we just need
> the same distribution for every run.
>
> Is this possible?
>
>
> Some code:
>
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.
> *getExecutionEnvironment*env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*)
> env.setParallelism(parallelism)
> val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
>
> rawStream.rescale
>
> ...
>
> best regards
>
>
>
> --
>
> *Martin Frank Hansen*
>
>
>


-- 

Martin Frank Hansen


flink sql hop????????????????????

2021-01-14 Thread bigdata
??

flink1.10.1,??=-??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, HOP_START(proctime, INTERVAL '$slide' 
SECOND, INTERVAL '$size' MINUTE)), '-MM-dd HH:mm:ss') start_time,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, HOP_START(proctime, 
INTERVAL '$slide' SECOND, INTERVAL '$size' MINUTE)), '-MM-dd HH:mm:ss')) * 
1000 AS `time`,
|CAST(COUNT(distinct drive_id) AS INT) num
|  FROM
|${databaseName}.log_stream
|  WHERE
|req_type = '1' and navigation_flag=' '
|  GROUP BY
|HOP(proctime, INTERVAL '$slide' SECOND, INTERVAL '$size' MINUTE)
3 2021-01-15 13:43:20,161068940,1
4 2021-01-15 13:43:30,161068941,2
1 2021-01-15 13:43:40,161068942,3
2 2021-01-15 13:43:50,161068943,3
3 2021-01-15 13:44:00,161068944,3
4 2021-01-15 13:44:10,161068945,3
1 2021-01-15 13:44:20,161068946,3
2 2021-01-15 13:44:30,161068947,3
3 2021-01-15 13:44:40,161068948,3
4 2021-01-15 13:44:50,161068949,3
1 2021-01-15 13:45:00,161068950,3
2 2021-01-15 13:45:10,161068951,3
3 2021-01-15 13:45:20,161068952,3

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
  Hi all,

We have some offline discussion together with @Arvid, @Roman and @Aljoscha and 
I'd 
like to post some points we discussed:

1) For the problem that the "new" root task coincidently finished before 
getting triggered
successfully, we have listed two options in the FLIP-147[1], for the first 
version, now we are not tend
to go with the first option that JM would re-compute and re-trigger new sources 
when it realized
some tasks are not triggered successfully. This option would avoid the 
complexity of adding 
new PRC and duplicating task states, and in average case it would not cause too 
much 
overhead.

2) For how to support operators like Sink Committer to wait for one complete 
checkpoint 
before exit, it would be more an issue of how to use the checkpoints after 
tasks finished instead 
of how to achieve checkpoint after tasks finished, thus we would like to not 
include this part 
first in the current discussion. We would discuss and solve this issue 
separately after FLIP-147 is done.

Best,
 Yun


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
--
From:Yun Gao 
Send Time:2021 Jan. 13 (Wed.) 16:09
To:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, 
previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the 
discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism 
cases. Another option
might be let the StreamTask do one synchronization with the 
CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions 
are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details 
in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the 
input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that 
for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been 
finished. One option
to address this issue is to make the upstream tasks to wait for buffers get 
flushed before exit, and 
we would include this in the future versions. I updated this part in this 
section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint 
before exit. To support
the operators that need to wait for some finalization condition like the Sink 
committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the 
runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks 
are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


--
From:Yun Gao 
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it 

flink sql hop????????????????????

2021-01-14 Thread bigdata
??

flink1.10.1,??=-??

Re: Flink app logs to Elastic Search

2021-01-14 Thread bat man
I was able to make it work with a fresh Elastic installation. Now
taskmanager and jobmanager logs are available in elastic.
Thanks for the pointers.

-Hemant.

On Wed, Jan 13, 2021 at 6:21 PM Aljoscha Krettek 
wrote:

> On 2021/01/11 01:29, bat man wrote:
> >Yes, no entries to the elastic search. No indices were created in elastic.
> >Jar is getting picked up which I can see from yarn logs. Pre-defined text
> >based logging is also available.
>
> Hmm, I can't imagine much that could go wrong. Maybe there is some
> interference from other configuration files. Could you try and make sure
> that you only have the configuration and logging system in the classpath
> that you want to use?
>
> Best,
> Aljoscha
>


Re:Re:flinksql 消费kafka offset问题

2021-01-14 Thread air23
我的意思 是不使用checkpoint。
使用'scan.startup.mode' = 'group-offsets' 去维护offset





















在 2021-01-15 11:35:16,"Michael Ran"  写道:
>下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 
>是异步操作,告诉上游你sink 完成了,实际你sink失败了
>在 2021-01-15 10:29:15,"air23"  写道:
>>flink消费kafka 只能使用checkpoint去维护offset吗
>>
>>我这边使用'scan.startup.mode' = 'group-offsets'
>>
>>如果中间报错了 或者停止任务,但是我下游sink还没有完成,
>>下次启动直接跳过这个报错的数据,会丢数据,谢谢回复


Gauges generating same graph

2021-01-14 Thread Manish G
Hi All,

I have few RichFlatMapFunction classes, and I have gauge added to each one
of  them. For a particular usecase I am updating these gauges
incrementally. I have a class member variable in each of these classes
which keeps increasing as flapMap function in these classes is called, and
then I update corresponding gauge with this updated value.

I observe that some of these gauges show almost exactly same graph in
grafana. Can it be due to this approach?


Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 Thread Yun Tang
Hi

  1.  目前没有全局的配置
  2.  开启cleanFullSnapshot 并不会物理清除数据,只是确保checkpoint数据中没有相关过期数据

祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 20:43
To: user-zh@flink.apache.org 
Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化

你好:
非常谢谢,
本地的数据是过期了。
不好意思,还有几个疑问想请教下。
1.看文档,开启cleanFullSnapshot是只能对单个状态设置吗,没查到flink sql 
开启cleanFullSnapshot的配置的地方?因为只看到StateTtlConfig是对于单个状态的设置,没有对job或者对table的config设置。
2.cleanFullSnapshot 开启后,从checkpoint恢复才会触发清理,不是在checkpoint过程中触发清理掉过期数据?


> 在 2021年1月14日,下午4:48,Yun Tang  写道:
>
> Hi,
>
> 你本地的数据肯定是过期了,checkpoint 
> size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full
>  snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。
>
> 其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。
>
> 建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#cleanup-in-full-snapshot
>
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 16:11
> To: user-zh@flink.apache.org 
> Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化
>
> 你好:
> 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。
>
>> 在 2021年1月14日,下午4:07,Yun Tang  写道:
>>
>> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
>>
>> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
>>
>>
>> 祝好
>> 唐云
>> 
>> From: 孙啸龙 
>> Sent: Thursday, January 14, 2021 15:52
>> To: user-zh@flink.apache.org 
>> Subject: Flink sql 状态过期后,checkpoint 大小没变化
>>
>> 大家好:
>>   版本:1.12.0
>>   方式:flink sql
>>   测试sql:
>>   select a.id,b.money,b.createTime from test_state_from a
>>   full join test_state_from1 b on a.id=b.id;
>>   问题:
>>  test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
>> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?
>



Re:flinksql 消费kafka offset问题

2021-01-14 Thread Michael Ran
下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 
是异步操作,告诉上游你sink 完成了,实际你sink失败了
在 2021-01-15 10:29:15,"air23"  写道:
>flink消费kafka 只能使用checkpoint去维护offset吗
>
>我这边使用'scan.startup.mode' = 'group-offsets'
>
>如果中间报错了 或者停止任务,但是我下游sink还没有完成,
>下次启动直接跳过这个报错的数据,会丢数据,谢谢回复


回复: Re: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。



yinghua...@163.com
 
发件人: Yun Tang
发送时间: 2021-01-15 11:02
收件人: user-zh
主题: Re: 回复: 请教个Flink checkpoint的问题
Hi
 
这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with 
savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain 
checkpoint的数量为1而被subsume掉了,也就是被删掉了。
 
如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
 
另外说一句,即使是已经deprecated的cancel with 
savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
 
 
[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
 
祝好
唐云

From: yinghua...@163.com 
Sent: Thursday, January 14, 2021 19:00
To: user-zh 
Subject: 回复: 回复: 请教个Flink checkpoint的问题
 
好的,感谢您的回复!
 
 
 
yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:
 
If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 
如回答有误,请指正。
 
 
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);
 
 
 
yinghua...@163.com
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
yinghua...@163.com


Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread Yun Tang
Hi

这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with 
savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain 
checkpoint的数量为1而被subsume掉了,也就是被删掉了。

如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。

另外说一句,即使是已经deprecated的cancel with 
savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。


[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained

祝好
唐云

From: yinghua...@163.com 
Sent: Thursday, January 14, 2021 19:00
To: user-zh 
Subject: 回复: 回复: 请教个Flink checkpoint的问题

好的,感谢您的回复!



yinghua...@163.com

发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:

If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

如回答有误,请指正。





发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);



yinghua...@163.com
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
yinghua...@163.com


flinksql 消费kafka offset问题

2021-01-14 Thread air23
flink消费kafka 只能使用checkpoint去维护offset吗

我这边使用'scan.startup.mode' = 'group-offsets'

如果中间报错了 或者停止任务,但是我下游sink还没有完成,
下次启动直接跳过这个报错的数据,会丢数据,谢谢回复

Re: Re:flink-sql字段类型问题

2021-01-14 Thread zhang hao
看了下源码BigInteger 转都会有问题,没有匹配的这种类型:

public boolean isNullAt(int pos) {
   return this.fields[pos] == null;
}

@Override
public boolean getBoolean(int pos) {
   return (boolean) this.fields[pos];
}

@Override
public byte getByte(int pos) {
   return (byte) this.fields[pos];
}

@Override
public short getShort(int pos) {
   return (short) this.fields[pos];
}

@Override
public int getInt(int pos) {
   return (int) this.fields[pos];
}

@Override
public long getLong(int pos) {
   return (long) this.fields[pos];
}

@Override
public float getFloat(int pos) {
   return (float) this.fields[pos];
}

@Override
public double getDouble(int pos) {
   return (double) this.fields[pos];
}


On Thu, Jan 14, 2021 at 6:24 PM yinghua...@163.com 
wrote:

>
> 回复错了,抱歉!
>
>
> yinghua...@163.com
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:16
> 收件人: user-zh
> 主题: Re: 转发:flink-sql字段类型问题
>
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory//出错信息
>
>
>
> yinghua...@163.com
> 发件人: 郝文强
> 发送时间: 2021-01-14 17:24
> 收件人: user-zh
> 主题: 转发:flink-sql字段类型问题
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
> - 转发邮件信息 -
> 发件人: 郝文强 <18846086...@163.com>
> 发送日期: 2021年01月14日 17:23
> 发送至: d...@flink.apache.org 
> 主题: 转发:flink-sql字段类型问题
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
> - 转发邮件信息 -
> 发件人: 郝文强 <18846086...@163.com>
> 发送日期: 2021年01月14日 17:22
> 发送至: dev-h...@flink.apache.org 
> 主题: flink-sql字段类型问题
> sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
> 麻烦各位帮看一下
> 源数据表是 mysql的information_schema.tables 表
> 表结构如下:
> table_catalog varchar(64)
> table_schema  varchar(64)
> table_name  varchar(64)
> table_type  enum('base table','view','system view')
> engine  varchar(64)
> version int
> row_format
> enum('fixed','dynamic','compressed','redundant','compact','paged')
> table_rows  bigint unsigned
> avg_row_length  bigint unsigned
> data_length bigint unsigned
> max_data_length bigint unsigned
> index_length  bigint unsigned
> data_free bigint unsigned
> auto_increment  bigint unsigned
> create_time timestamp
> update_time datetime
> check_time  datetime
> table_collation varchar(64)
> checksum  bigint
> create_options  varchar(256)
> table_comment text
> 我的flink sql 建表语句:
>CREATE TABLE info_table (
>   TABLE_CATALOG STRING,
>   TABLE_SCHEMA STRING,
>   TABLE_NAME STRING,
>   TABLE_TYPE STRING,
>   ENGINE STRING,
>   VERSION INT,
>   ROW_FORMAT STRING,
>   TABLE_ROWS BIGINT,
>   AVG_ROW_LENGTH BIGINT,
>   DATA_LENGTH BIGINT,
>   MAX_DATA_LENGTH BIGINT,
>   INDEX_LENGTH BIGINT,
>   DATA_FREE BIGINT,
>   AUTO_INCREMENT BIGINT,
>   CREATE_TIME TIMESTAMP,
>   UPDATE_TIME TIMESTAMP,
>   CHECK_TIME TIMESTAMP,
>   TABLE_COLLATION STRING,
>   CHECKSUM INTEGER,
>   CREATE_OPTIONS STRING,
>   TABLE_COMMENT STRING,
>   PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/information_schema',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'TABLES'
> );
> 反复改了几次类型都报错:
> java.math.BigInteger cannot be cast to java.lang.Integer
> java.lang.Long cannot be cast to java.math.BigDecimal
> java.lang.Long cannot be cast to java.lang.Integer
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
>


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Xingbo Huang
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
The signature of the `process_element` method has been changed in the new
version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor  于2021年1月15日周五 上午1:20写道:

> Thank you for the answer Shuiqiang!
> Im using the last apache-flink version:
>
>> Requirement already up-to-date: apache-flink in
>> ./venv/lib/python3.7/site-packages (1.12.0)
>
> however the method signature is using a collector:
>
> [image: image.png]
>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
> docs(which uses pip).
>
> Regards
>
> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen 
> wrote:
>
>> Hi meneldor,
>>
>> The main cause of the error is that there is a bug in
>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>> when the first record come into the KeyedProcessFunction.process_element()
>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>
>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>>
>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>> automatically converted to a long interger in python but will cause Long
>> value overflow in Java when deserializing the registered timer value. I
>> will craete a issue to fix the bug.
>>
>> Let’s return to your initial question, at PyFlink you could create a Row
>> Type data as bellow:
>>
>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)
>>
>> And I wonder which release version of flink the code snippet you provided
>> based on? The latest API for KeyedProcessFunction.process_element() and
>> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
>> output data but use `yield` which is a more pythonic approach.
>>
>> Please refer to the following code:
>>
>> def keyed_process_function_example():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.get_config().set_auto_watermark_interval(2000)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>>(2, 'hi', '1603708224000'),
>>(3, 'hello', '1603708226000'),
>>(4, 'hi', '1603708289000')],
>>   type_info=Types.ROW([Types.INT(), 
>> Types.STRING(), Types.STRING()]))
>>
>> class MyTimestampAssigner(TimestampAssigner):
>>
>> def extract_timestamp(self, value, record_timestamp) -> int:
>> return int(value[2])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>>
>> def process_element(self, value, ctx: 
>> 'KeyedProcessFunction.Context'):
>> yield Row(id=ctx.get_current_key()[1], data='some_string', 
>> timestamp=)
>> # current_watermark = ctx.timer_service().current_watermark()
>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
>> 1500)
>>
>> def on_timer(self, timestamp: int, ctx: 
>> 'KeyedProcessFunction.OnTimerContext'):
>> yield Row(id=ctx.get_current_key()[1], data='current on timer 
>> timestamp: ' + str(timestamp),
>>   timestamp=timestamp)
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
>> [Types.STRING(), Types.STRING(), Types.INT()])
>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>> .with_timestamp_assigner(MyTimestampAssigner())
>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>> .key_by(lambda x: (x[0], x[1]), 
>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>> .process(MyProcessFunction(), output_type=output_type_info).print()
>> env.execute('test keyed process function')
>>
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>>
>>
>> meneldor  于2021年1月14日周四 下午10:45写道:
>>
>>> Hello,
>>>
>>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>>> trying this:
>>>
>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>>  [Types.STRING(), Types.STRING(), 
>>> Types.LONG() ])
>>>
>>> class MyProcessFunction(KeyedProcessFunction):
>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>>> out: Collector):
>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>>> "timestamp": }
>>> out.collect(result)
>>> current_watermark = ctx.timer_service().current_watermark()
>>> ctx.timer_service().register_event_time_timer(current_watermark + 
>>> 1500)
>>>
>>> def on_timer(self, timestamp, ctx: 
>>> 

Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 Thread Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句



 
发件人: datayangl
发送时间: 2021-01-14 16:13
收件人: user-zh
主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
FlinkUtils.initTable()
val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
streamEnv.disableOperatorChaining()
streamEnv.setParallelism(1)
streamEnv.setMaxParallelism(1)
CheckPointUtils.setCheckPoint(streamEnv, 12, 6)
dealWithOdsDataTohive(tableEnv)
val sqls:Map[String,String] = ConfigItem.ODS_SQL
 
val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet
 
val filledAllSqlsTable = sqls.map(x=>{
  val hiveMapTopic = hiveTableMapTopic
  val topicName = hiveMapTopic.getOrElse(x._1,null)
  val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
  (x._1,topic,x._2)
}).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
  val sql = fillTemplate(x._1,x._2,x._3)
  tableEnv.executeSql(sql)
  x._1
})
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
val stmtSet = tableEnv.createStatementSet()
val allInsertSqls = filledAllSqlsTable.map(table=>{
  s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
}).toList
allInsertSqls.foreach(x=>{
  stmtSet.addInsertSql(x)
})
val insertTaskStatus = stmtSet.execute()
//insertTaskStatus.print()
println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"
 
filled
  }
 
执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)
 
报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。
 
 
参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-14 Thread Dan Hill
Hey, sorry for the late reply.  I'm using v1.11.1.

Cool.  I did a non-SQL way of using the first row.  I'll try to see if I
can do this in the SQL version.

On Wed, Jan 13, 2021 at 11:26 PM Jark Wu  wrote:

> Hi Dan,
>
> Sorry for the late reply.
>
> I guess you applied a "deduplication with keeping last row" before the
> interval join?
> That will produce an updating stream and interval join only supports
> append-only input.
> You can try to apply "deduplication with keeping *first* row" before the
> interval join.
> That should produce an append-only stream and interval join can consume
> from it.
>
> Best,
> Jark
>
>
>
> On Tue, 5 Jan 2021 at 20:07, Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Which Flink version are you using? I know that there has been quite a bit
>> of optimization of deduplication in 1.12, which would reduce the required
>> state tremendously.
>> I'm pulling in Jark who knows more.
>>
>> On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:
>>
>>> Hi!
>>>
>>> I'm using Flink SQL to do an interval join.  Rows in one of the tables
>>> are not unique.  I'm fine using either the first or last row.  When I try
>>> to deduplicate
>>> 
>>>  and
>>> then interval join, I get the following error.
>>>
>>> IntervalJoin doesn't support consuming update and delete changes which
>>> is produced by node Rank(strategy=[UndefinedStrategy],
>>> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>> partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id,
>>> log_user_id, client_log_ts, event_api_ts, ts])
>>>
>>> Is there a way to combine these in this order?  I could do the
>>> deduplication afterwards but this will result in more state.
>>>
>>> - Dan
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Enrich stream with SQL api

2021-01-14 Thread Dawid Wysakowicz
Hi Marek,

I am afraid I don't have a good answer for your question. The problem
indeed is that the JDBC source can work only as a bounded source. As you
correctly pointed out, as of now mixing bounded with unbounded sources
does not work with checkpointing, which we want to address in the
FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of
JDBCDynamicTableSource so that it produces an UNBOUNDED source.
Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use
one of the connectors from here[1], which use the embedded Debezium
engine, therefore you would not need to setup any external tools, but
just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I
haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else
can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you
could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
> Hello,
> I am trying to use Flink SQL api to join two tables. My stream data
> source is kafka (defined through catalog and schema registry) and my
> enrichment data is located in relational database (JDBC connector). I
> think this setup reflects quite common use case
>
> Enrichment table definition looks like this:
> CREATE TABLE dim (
>   ID BIGINT,
>   ENRICH STRING,
>   FROM_DATE TIMESTAMP(6),
>   TO_DATE TIMESTAMP(6),
>   WATERMARK FOR TO_DATE AS TO_DATE
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = ‘…’,
>    'table-name' = ‘…’,
>    'username' = ‘…’,
>    'password' = ‘…’
> )
>
> And this is join I use against stream coming from kafka (table with
> watermark spec), trunc is udf:
> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
> D1.ENRICH as `ENRICH`,
> T1.FIELD as `FIELD`,
> FROM `kafka.topic` T1, dim D1
> WHERE T1.ENRICH_ID = D1.ID 
> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
> AND T1.START_TIME >= D1.FROM_DATE
>
> Result job graph contains two table source scan operators together
> with interval join operator.
>  
> The problem I am trying to solve is how to change the character of
> enrichment table. Currently, related operator task reads whole data
> from table when the job start and finishes afterwards. Ideally, I
> would like to have continuously updated enrichment table. Is it
> possible to achieve without CDC for example by querying whole database
> periodically or use some kind of cache for keys? We can assume that
> enrichment table is append only, there are no deletes or updates, only
> inserts for new time intervals
>
> If updates are not possible, how can I deal with finished task? Due to
> a known issue [1], all checkpoints are aborted . Maybe I could live
> with restarting job to get new enrichment data as it is not refreshed
> so frequently, but checkpointing is a must.
>
> flink version 1.12
>
> regards
> Marek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


signature.asc
Description: OpenPGP digital signature


Re: Deterministic rescale for test

2021-01-14 Thread Jaffe, Julian
Martin,

You can use `.partitionCustom` and provide a partitioner if you want to control 
explicitly how elements are distributed to downstream tasks.

From: Martin Frank Hansen 
Reply-To: "m...@berlingskemedia.dk" 
Date: Thursday, January 14, 2021 at 1:48 AM
To: user 
Subject: Deterministic rescale for test

Hi,

I am trying to make a test-suite for our Flink jobs, and are having problems 
making the input-data deterministic.

We are reading a file-input with parallelism 1 and want to rescale to a higher 
parallelism, such that the ordering of the data is the same every time.

I have tried using rebalance, rescale but it seems to randomly distribute data 
between partitions. We don't need something optimized, we just need the same 
distribution for every run.
Is this possible?

Some code:

val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)

rawStream.rescale

...
best regards

--

Martin Frank Hansen




Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:

> Requirement already up-to-date: apache-flink in
> ./venv/lib/python3.7/site-packages (1.12.0)

however the method signature is using a collector:

[image: image.png]
 Im using the *setup-pyflink-virtual-env.sh* shell script from the
docs(which uses pip).

Regards

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen  wrote:

> Hi meneldor,
>
> The main cause of the error is that there is a bug in
> `ctx.timer_service().current_watermark()`. At the beginning the stream,
> when the first record come into the KeyedProcessFunction.process_element()
> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>
> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>
> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
> automatically converted to a long interger in python but will cause Long
> value overflow in Java when deserializing the registered timer value. I
> will craete a issue to fix the bug.
>
> Let’s return to your initial question, at PyFlink you could create a Row
> Type data as bellow:
>
> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)
>
> And I wonder which release version of flink the code snippet you provided
> based on? The latest API for KeyedProcessFunction.process_element() and
> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
> output data but use `yield` which is a more pythonic approach.
>
> Please refer to the following code:
>
> def keyed_process_function_example():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.get_config().set_auto_watermark_interval(2000)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>(2, 'hi', '1603708224000'),
>(3, 'hello', '1603708226000'),
>(4, 'hi', '1603708289000')],
>   type_info=Types.ROW([Types.INT(), 
> Types.STRING(), Types.STRING()]))
>
> class MyTimestampAssigner(TimestampAssigner):
>
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[2])
>
> class MyProcessFunction(KeyedProcessFunction):
>
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> yield Row(id=ctx.get_current_key()[1], data='some_string', 
> timestamp=)
> # current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
> 1500)
>
> def on_timer(self, timestamp: int, ctx: 
> 'KeyedProcessFunction.OnTimerContext'):
> yield Row(id=ctx.get_current_key()[1], data='current on timer 
> timestamp: ' + str(timestamp),
>   timestamp=timestamp)
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
> [Types.STRING(), Types.STRING(), Types.INT()])
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(MyTimestampAssigner())
> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: (x[0], x[1]), 
> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
> .process(MyProcessFunction(), output_type=output_type_info).print()
> env.execute('test keyed process function')
>
>
> Best,
> Shuiqiang
>
>
>
>
>
> meneldor  于2021年1月14日周四 下午10:45写道:
>
>> Hello,
>>
>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>> trying this:
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>  [Types.STRING(), Types.STRING(), 
>> Types.LONG() ])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>> out: Collector):
>> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>> "timestamp": }
>> out.collect(result)
>> current_watermark = ctx.timer_service().current_watermark()
>> ctx.timer_service().register_event_time_timer(current_watermark + 
>> 1500)
>>
>> def on_timer(self, timestamp, ctx: 
>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
>> logging.info(timestamp)
>> out.collect("On timer timestamp: " + str(timestamp))
>>
>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
>> Types.STRING()])) \
>>.process(MyProcessFunction(), output_type=output_type_info)
>>
>>
>> I just hardcoded the values in MyProcessFunction to be sure that the
>> input data doesnt mess the fields. So the data is correct but PyFlink trews
>> an exception:
>>
>> at 

Re: error accessing S3 bucket 1.12

2021-01-14 Thread Dawid Wysakowicz
Hi Billy,

I think you might be hitting the same problem as described in this
thread[1]. Does your bucket meet all the name requirements as described
in here[2] (e.g. have an underscore)?

Best,

Dawid

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unable-to-set-S3-like-object-storage-for-state-backend-td28362.html

[2] https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html

On 13/01/2021 19:20, Billy Bain wrote:
> I'm trying to use readTextFile() to access files in S3. I have
> verified the s3 key and secret are clean and the s3 path is similar to
> com.somepath/somefile. (the names changed to protect the guilty)
>
> Any idea what I'm missing? 
>
> 2021-01-13 12:12:43,836 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
> [] - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://com.somepath/somefile
> 2021-01-13 12:12:43,843 DEBUG
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory [] -
> Creating S3 file system backed by Hadoop s3a file system
> 2021-01-13 12:12:43,844 DEBUG
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory [] - Loading
> Hadoop configuration for Hadoop s3a file system
> 2021-01-13 12:12:43,926 DEBUG
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding
> Flink config entry for s3.access-key as fs.s3a.access-key to Hadoop config
> 2021-01-13 12:12:43,926 DEBUG
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding
> Flink config entry for s3.secret-key as fs.s3a.secret-key to Hadoop config
> 2021-01-13 12:12:43,944 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking
> Split Reader: Custom File Source -> (Timestamps/Watermarks, Map ->
> Filter -> Sink: Unnamed) (1/1)#0
> 2021-01-13 12:12:43,944 DEBUG
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Creating operator state backend for
> TimestampsAndWatermarksOperator_1cf40e099136da16c66c61032de62905_(1/1)
> with empty state.
> 2021-01-13 12:12:43,946 DEBUG
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Creating operator state backend for
> StreamSink_d91236bbbed306c2379eac4982246f1f_(1/1) with empty state.
> 2021-01-13 12:12:43,955 DEBUG org.apache.hadoop.conf.Configuration []
> - Reloading 1 existing configurations
> 2021-01-13 12:12:43,961 DEBUG
> org.apache.flink.fs.s3hadoop.S3FileSystemFactory [] - Using scheme
> s3://com.somepath/somefile for s3a file system backing the S3 File System
> 2021-01-13 12:12:43,965 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
> [] - Closed File Monitoring Source for path: s3://com.somepath/somefile.
> 2021-01-13 12:12:43,967 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Source: Custom File Source (1/1)#0
> (1d75ae07abbd65f296c55a61a400c59f) switched from RUNNING to FAILED.
> java.io .IOException: null uri host. This can be
> caused by unencoded / in the password string
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:163)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:196)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> Caused by: java.lang.NullPointerException: null uri host. This can be
> caused by unencoded / in the password string
> at java.util.Objects.requireNonNull(Objects.java:246) ~[?:?]
> at
> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:69)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> 

Re: Elasticsearch config maxes can not be disabled

2021-01-14 Thread Rex Fenley
Flink 1.11.2

CREATE TABLE sink_es (
...
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${sys:proxyEnv.ELASTICSEARCH_HOSTS}',
'index' = '${sys:graph.flink.index_name}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '0',
'sink.bulk-flush.max-size' = '0',
'sink.bulk-flush.interval' = '1s',
'sink.bulk-flush.backoff.delay' = '1s',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)

On Thu, Jan 14, 2021 at 4:16 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> First of all, what Flink versions are you using?
>
> You are right it is a mistake in the documentation of the
> sink.bulk-flush.max-actions. It should say: Can be set to '-1' to disable
> it. I created a ticket[1] to track that. And as far as I can tell and I
> quickly checked that it should work. As for the sink.bulk-flush.max-size
> you should be able to disable it with a value of '0'.
>
> Could you share with us how do you use the connector? Could you also share
> the full stack trace for the exception you're getting? Are you creating the
> table with a CREATE statement?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-20979
> On 13/01/2021 20:10, Rex Fenley wrote:
>
> Hello,
>
> It doesn't seem like we can disable max actions and max size for
> Elasticsearch connector.
>
> Docs:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
> sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
> buffered actions per bulk request. Can be set to '0' to disable it.
> sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory
> of buffered actions per bulk request. Must be in MB granularity. Can be set
> to '0' to disable it.
> Reality:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Max number of buffered actions must be larger than
> 0.
>
> ES code looks like -1 is actually the value for disabling, but when I use
> -1:
> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
> for key 'sink.bulk-flush.max-size'.
>
> How can I disable these two settings?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Shuiqiang Chen
Hi meneldor,

The main cause of the error is that there is a bug in
`ctx.timer_service().current_watermark()`. At the beginning the stream,
when the first record come into the KeyedProcessFunction.process_element()
, the current_watermark will be the Long.MIN_VALUE at Java side, while at
the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.

>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)

Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
automatically converted to a long interger in python but will cause Long
value overflow in Java when deserializing the registered timer value. I
will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row
Type data as bellow:

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)

And I wonder which release version of flink the code snippet you provided
based on? The latest API for KeyedProcessFunction.process_element() and
KeyedProcessFunction.on_timer() will not provid a `collector` to collect
output data but use `yield` which is a more pythonic approach.

Please refer to the following code:

def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
   (2, 'hi', '1603708224000'),
   (3, 'hello', '1603708226000'),
   (4, 'hi', '1603708289000')],

type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string',
timestamp=)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp()
+ 1500)

def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on
timer timestamp: ' + str(timestamp),
  timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]),
key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')


Best,
Shuiqiang





meneldor  于2021年1月14日周四 下午10:45写道:

> Hello,
>
> What is the correct way to use Python dict's as ROW type in pyflink? Im
> trying this:
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>  [Types.STRING(), Types.STRING(), 
> Types.LONG() ])
>
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
> out: Collector):
> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
> "timestamp": }
> out.collect(result)
> current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(current_watermark + 
> 1500)
>
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', 
> out: 'Collector'):
> logging.info(timestamp)
> out.collect("On timer timestamp: " + str(timestamp))
>
> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
> Types.STRING()])) \
>.process(MyProcessFunction(), output_type=output_type_info)
>
>
> I just hardcoded the values in MyProcessFunction to be sure that the input
> data doesnt mess the fields. So the data is correct but PyFlink trews an
> exception:
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>> at
>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> 

Pyflink Join with versioned view / table

2021-01-14 Thread Barth, Torben
Dear List,

I have trouble implementing a join between two streaming tables in Python Table 
API.

The left table of my join should be enriched with the information with the last 
value of the right_table. The right_table is updated only rarely (maybe after 
15 minutes). When implementing the join I get only updates when the right table 
changes. I want to trigger the updates for the joined table every time when I 
receive a record on the left side. The record should be enriched with the most 
recent result of the right side. I have not found a way to implement with the 
desired result.

It tried an implementation using a versioned view. Here is a short example:

left_table
root
|-- measurement_time: TIMESTAMP(3) *ROWTIME*
|-- x: DOUBLE
|-- y: DOUBLE
|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`

right_table
|-- some_value: INT
|-- id: INT
|-- modtime: TIMESTAMP(3) *ROWTIME*
 The "id" is always defined as 1.
 I perform the following operations

t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) 
AS left_artificial_key"))
t_env.create_temporary_view("right_table", right_table)

sql_view = """
-- Define a versioned view
CREATE VIEW versioned_right AS
SELECT id, some_value, modtime
  FROM (
 SELECT *,
 ROW_NUMBER() OVER (PARTITION BY id
ORDER BY modtime DESC) AS rownum
 FROM right_table)
WHERE rownum = 1
"""

view = t_env.execute_sql(sql_view)

sql = """
   SELECT
   left_table.*, versioned_right.some_value
FROM left_table
LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF 
left_table.measurement_time
ON abt.left_artificial_key = versioned_right.id
"""

joined = t_env.sql_query(sql)


I observed the same behavior when using a lateral join.

Does anybody have an idea how the join could be implemented in the correct way?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im
trying this:

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
 [Types.STRING(), Types.STRING(),
Types.LONG() ])

class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx:
'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data":
"some_string", "timestamp": }
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx:
'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))

ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(),
Types.STRING()])) \
   .process(MyProcessFunction(), output_type=output_type_info)


I just hardcoded the values in MyProcessFunction to be sure that the input
data doesnt mess the fields. So the data is correct but PyFlink trews an
exception:

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at
> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
> ... 10 more

However it works with primitive types like Types.STRING(). According
to the documentation the ROW type corresponds to the python's dict
type.


Regards


Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread sagar
Thanks Yun



On Thu, Jan 14, 2021 at 1:58 PM Yun Gao  wrote:

> Hi Sagar,
>
>   I rechecked and found that the new kafka source is not formally publish
> yet, and a stable method I think may be try adding the FlinkKafkaConsumer
> as a BOUNDED source first. Sorry for the inconvient.
>
> Best,
>  Yun
>
> --
> Sender:Yun Gao
> Date:2021/01/14 15:26:54
> Recipient:Ardhani Narasimha; sagar<
> sagarban...@gmail.com>
> Cc:Flink User Mail List
> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
> Hi Sagar,
>
>   I think the problem is that the legacy source implemented by
> extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use
> env.addSource(). Although there is hacky way to add the legacy sources as
> BOUNDED source [1], I think you may first have a try of new version of
> KafkaSource [2] ? The new version of KafkaSource is implemented with the
> new Source API [3], which provides unfied support for the streaming and
> batch mode.
>
> Best,
>  Yun
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
> [2]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>
>
> --Original Mail --
> *Sender:*Ardhani Narasimha 
> *Send Date:*Thu Jan 14 15:11:35 2021
> *Recipients:*sagar 
> *CC:*Flink User Mail List 
> *Subject:*Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
>> Interesting use case.
>>
>> Can you please elaborate more on this.
>> On what criteria do you want to batch? Time? Count? Or Size?
>>
>> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>>
>>> Hi Team,
>>>
>>> I am getting the following error while running DataStream API in with
>>> batch mode with kafka source.
>>> I am using FlinkKafkaConsumer to consume the data.
>>>
>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>> at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>>> ~[flink-core-1.12.0.jar:1.12.0]
>>>
>>> In my batch program I wanted to work with four to five different stream
>>> in batch mode as data source is bounded
>>>
>>> I don't find any clear example of how to do it with kafka souce with
>>> Flink 1.12
>>>
>>> I don't want to use JDBC source as underlying database table may change.
>>> please give me some example on how to achieve the above use case.
>>>
>>> Also for any large bounded source are there any alternatives to
>>> achieve this?
>>>
>>>
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>
>>
>> ---
>> *IMPORTANT*: The contents of this email and any attachments are
>> confidential and protected by applicable laws. If you have received this
>> email by mistake, please (i) notify the sender immediately; (ii) delete it
>> from your database; and (iii) do not disclose the contents to anyone or
>> make copies thereof. Razorpay accepts no liability caused due to any
>> inadvertent/ unintentional data transmitted through this email.
>>
>> ---
>>
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-14 Thread Arvid Heise
Hi Avi,

could you run
gradle dependencies
and report back to me?

Also did you ensure to run
gradle clean
before? The gradle version you are using is ancient, so I'm not sure if
it's picking up the change correctly.

On Thu, Jan 14, 2021 at 10:55 AM Avi Levi  wrote:

> No, I don't.  I actually tried also with 2.12.7 and got the same result
>
> On Thu, Jan 14, 2021 at 11:07 AM Arvid Heise  wrote:
>
>> Hi Avi,
>>
>> apparently the maximum version that Flink supports for scala is 2.12.7
>> [1]. Do you have a specific reason to use a higher version?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12461
>>
>> On Thu, Jan 14, 2021 at 5:11 AM Avi Levi  wrote:
>>
>>> Hi Arvid,
>>> Please find attached full build.gradle file
>>>
>>> On Tue, Jan 12, 2021 at 8:18 PM Arvid Heise  wrote:
>>>
 Can you post the full dependencies of sbt/maven/gradle whatever?

 On Tue, Jan 12, 2021 at 3:54 AM Avi Levi  wrote:

> Hi Arvid,
> using :
>
> flinkVersion = '1.12.0'
> scalaBinaryVersion = '2.12'
>
> I simplified the example to (same exception)  :
>
> object Flinktest extends App {
>   private val env = StreamExecutionEnvironment.getExecutionEnvironment
>   env.fromElements("A", "B","c")
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
> .process{new ProcessAllWindowFunction[String, List[String], 
> TimeWindow] {
>   override def process(context: Context, elements: Iterable[String], 
> out: Collector[List[String]]): Unit = {
> out.collect(elements.toList)
>   }
> }
> }
> .print()
>
> env.execute("Sample")
> }
>
>
>
>
> On Tue, Jan 5, 2021 at 1:53 PM Arvid Heise 
> wrote:
>
>> Hi Avi,
>>
>> without being a scala-guy, I'm guessing that you are mixing scala
>> versions. Could you check that your user code uses the same scala version
>> as Flink (1.11 or 1.12)? I have also heard of issues with different minor
>> versions of scala, so make sure to use the exact same version (e.g.
>> 2.11.12).
>>
>> On Mon, Dec 28, 2020 at 3:54 PM Avi Levi  wrote:
>>
>>> I am trying to aggregate all records in a time window. This is my
>>> ProcessAllWindowFunction :
>>>
>>> case class SimpleAggregate(elms: List[String])
>>>
>>> class AggregateLogs extends ProcessAllWindowFunction[String, 
>>> SimpleAggregate, TimeWindow ] {
>>>   override def process(context: Context, elements: Iterable[String], 
>>> out: Collector[SimpleAggregate]): Unit = {
>>> val es: List[String] = elements.toList
>>> val record = SimpleAggregate(es)
>>> out.collect(record)
>>>   }
>>> }
>>>
>>> But I am getting this exception why ?
>>>
>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>> scala.tools.reflect.ToolBoxError: reflective compilation has failed: 
>>> cannot
>>> initialize the compiler due to java.lang.BootstrapMethodError:
>>> java.lang.NoSuchMethodError:
>>> scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
>>> at
>>> 

Re: Dead code in ES Sink

2021-01-14 Thread Aljoscha Krettek

On 2021/01/13 07:50, Rex Fenley wrote:

Are you saying that this option does get passed along to Elasticsearch
still or that it's just arbitrarily validated? According to [1] it's been
deprecated in ES 6 and removed in ES 7.

[1] https://github.com/elastic/elasticsearch/pull/38085


Sorry, I wasn't being very clear. I meant that we just pass it on to ES.  
In light of it being deprecated, I think it makes sense to keep it as 
long as we support ES 6. What do you think?


Side note: we still have an ES 5 connector...  There was a discussion 
about dropping it, but it wasn't conclusive. [1]


[1] 
https://lists.apache.org/thread.html/rb957e7d7d5fb9bbe25e5fbc56662749ee1bc551d36e26c58644f60d4%40%3Cdev.flink.apache.org%3E


Best,
Aljoscha


Re: Flink[Python] questions

2021-01-14 Thread Shuiqiang Chen
Hi Dc,

Thank you for your feedback.

1. Currently, only built-in types are supported in Python DataStream API,
however, you can apply a Row type to represent a  custom Python class as a
workaround that field names stand for the name of member variables and
field types stand for the type of member variables.

2. Could you please provide the full executed command line and which kind
of cluster you are running (standalone/yarn/k8s)? Various command lines to
submit a Pylink job are shown in
https://ci.apache.org/projects/flink/flink-docs-master/deployment/cli.html#submitting-pyflink-jobs
.

The attachment is an example code for a Python DataStream API job, for your
information.

Best,
Shuiqiang

Dc Zhao (BLOOMBERG/ 120 PARK)  于2021年1月14日周四
下午1:00写道:

> Hi Flink Community:
> We are using the pyflink to develop a POC for our project. We encountered
> some questions while using the flink.
>
> We are using the flink version 1.2, python3.7, data stream API
>
> 1. Do you have examples of providing a python customized class as a
> `result type`? Based on the documentation research, we found out only
> built-in types are supported in Python. Also, what is the payload size
> limitation inside the flink, do we have a recommendation for that?
>
> 2. Do you have examples of `flink run --python` data stream API codes to
> the cluster? We tried to do that, however the process hangs on a `socket
> read from the java gateway`, due to the lack of the missing logs, we are
> not sure what is missing while submitting the job.
>

>

>

>
> Regards
> Dc
>
>
> << {CH} {TS} Anything that can possibly go wrong, it does. >>
>
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, ProcessFunction


def datastream_processfunction_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, '1603708211000'),
   (2, '1603708224000'),
   (3, '1603708226000'),
   (4, '1603708289000')],
  type_info=Types.ROW([Types.INT(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
yield "current timestamp: {}, current watermark: {}, current_value: {}" \
.format(str(current_timestamp), str(current_watermark), str(value))

def on_timer(self, timestamp, ctx, out):
pass

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.process(MyProcessFunction(), output_type=Types.STRING()).print()
env.execute("Python DataStream Example")


if __name__ == '__main__':
datastream_processfunction_example()


Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 Thread 孙啸龙
你好:
非常谢谢,
本地的数据是过期了。
不好意思,还有几个疑问想请教下。
1.看文档,开启cleanFullSnapshot是只能对单个状态设置吗,没查到flink sql 
开启cleanFullSnapshot的配置的地方?因为只看到StateTtlConfig是对于单个状态的设置,没有对job或者对table的config设置。
2.cleanFullSnapshot 开启后,从checkpoint恢复才会触发清理,不是在checkpoint过程中触发清理掉过期数据?


> 在 2021年1月14日,下午4:48,Yun Tang  写道:
> 
> Hi,
> 
> 你本地的数据肯定是过期了,checkpoint 
> size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full
>  snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。
> 
> 其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。
> 
> 建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#cleanup-in-full-snapshot
> 
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 16:11
> To: user-zh@flink.apache.org 
> Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化
> 
> 你好:
> 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。
> 
>> 在 2021年1月14日,下午4:07,Yun Tang  写道:
>> 
>> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
>> 
>> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
>> 
>> 
>> 祝好
>> 唐云
>> 
>> From: 孙啸龙 
>> Sent: Thursday, January 14, 2021 15:52
>> To: user-zh@flink.apache.org 
>> Subject: Flink sql 状态过期后,checkpoint 大小没变化
>> 
>> 大家好:
>>   版本:1.12.0
>>   方式:flink sql
>>   测试sql:
>>   select a.id,b.money,b.createTime from test_state_from a
>>   full join test_state_from1 b on a.id=b.id;
>>   问题:
>>  test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
>> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?
> 



Fwd: Error querying flink state

2021-01-14 Thread Falak Kansal
Hi,

I have set up a flink cluster on my local machine. I created a flink job (
TrackMaximumTemperature) and made the state queryable. I am using
*github/streamingwithflink/chapter7/QueryableState.scala* example from
*https://github.com/streaming-with-flink
* repository. Please find the
file attached.

Now i have the running job id and when i go and try to access the state, it
throws an exception. I see the job is running and I am using the correct
jobId. Also checkpointing is enabled in the original job and i have set the
properties related to checkpointing in flink-conf.yaml. Am I
missing something? Any leads will be appreciated. Thank you :)


*Exception stack trace:*
Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=maxTemperature of
job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
package io.github.streamingwithflink.chapter7

import java.util.concurrent.CompletableFuture

import io.github.streamingwithflink.util.{SensorReading, SensorSource, 
SensorTimeAssigner}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.queryablestate.client.QueryableStateClient
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object TrackMaximumTemperature {

  /** main() defines and executes the DataStream program */
  def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
  // SensorSource generates random temperature readings
  .addSource(new SensorSource)
  // assign timestamps and watermarks which are required for event time
  .assignTimestampsAndWatermarks(new SensorTimeAssigner)

val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
  // project to sensor id and temperature
  .map(r => (r.id, r.temperature))
  // compute every 10 seconds the max temperature per sensor
  .keyBy(_._1)
  .timeWindow(Time.seconds(10))
  .max(1)

// store latest value for each sensor in a queryable state
tenSecsMaxTemps
  .keyBy(_._1)
  .asQueryableState("maxTemperature")

// execute application
env.execute("Track max temperature")
  }
}

object TemperatureDashboard {

  // queryable state proxy connection information.
  // can be looked up in logs of running QueryableStateJob
  val proxyHost = "localhost"
  val proxyPort = 9069
  // jobId of running QueryableStateJob.
  // can be looked up in logs of running job or the web UI
  val jobId = "9a528bf3e1b650aed7e0b1e26d038ad5"

  // how many sensors to query
  val numSensors = 5
  // how often to query
  val refreshInterval = 1

  def main(args: Array[String]): Unit = {

// configure client with host and port of queryable state proxy
val client = new QueryableStateClient(proxyHost, proxyPort)

val futures = new Array[CompletableFuture[ValueState[(String, 
Double)]]](numSensors)
val results = new Array[Double](numSensors)

// print header line of dashboard table
val header = (for (i <- 0 until numSensors) yield "sensor_" + (i + 
1)).mkString("\t| ")
println(header)

// loop forever
while (true) {

  // send out async queries
  for (i <- 0 until numSensors) {
futures(i) = queryState("sensor_" + (i + 1), client)
  }
  // wait for results
  for (i <- 0 until numSensors) {

Re: Elasticsearch config maxes can not be disabled

2021-01-14 Thread Dawid Wysakowicz
Hi,

First of all, what Flink versions are you using?

You are right it is a mistake in the documentation of the
sink.bulk-flush.max-actions. It should say: Can be set to |'-1'| to
disable it. I created a ticket[1] to track that. And as far as I can
tell and I quickly checked that it should work. As for the
sink.bulk-flush.max-size you should be able to disable it with a value
of '0'.

Could you share with us how do you use the connector? Could you also
share the full stack trace for the exception you're getting? Are you
creating the table with a CREATE statement?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-20979

On 13/01/2021 20:10, Rex Fenley wrote:
> Hello,
>
> It doesn't seem like we can disable max actions and max size for
> Elasticsearch connector.
>
> Docs:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
>
>
>   sink.bulk-flush.max-actions
>
>   optional1000Integer Maximum number of buffered 
> actions per bulk
> request. Can be set to |'0'| to disable it.
>
>
>   sink.bulk-flush.max-size
>
>   optional2mb MemorySize  Maximum size in memory of 
> buffered actions
> per bulk request. Must be in MB granularity. Can be set to |'0'| to
> disable it.
>
> Reality:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Max number of buffered actions must be larger
> than 0.
>
> ES code looks like -1 is actually the value for disabling, but when I
> use -1:
> Caused by: java.lang.IllegalArgumentException: Could not parse value
> '-1' for key 'sink.bulk-flush.max-size'.
>
> How can I disable these two settings?
>
> Thanks!
>
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG
>  |  FOLLOW US
>  |  LIKE US
> 
>


signature.asc
Description: OpenPGP digital signature


Re: StreamingFileSink with ParquetAvroWriters

2021-01-14 Thread Dawid Wysakowicz
Hi Jan

Could you make sure you are packaging that dependency with your job jar?
There are instructions how to configure your build setup[1]. Especially
the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:
>
> Hi,
>
>  
>
> i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink
> for writing into HDFS in Parquet format.
>
>  
>
> As it says in the documentation I have added the dependencies:
>
>  
>
> 
>    org.apache.flink
>    flink-parquet_${scala.binary.version}
>    ${flink.version}
> 
>
>  
>
> And this is my file sink definition:
>
>  
>
> val sink: StreamingFileSink[Event] = StreamingFileSink  
> ./forBulkFormat/(     new
> Path("hdfs://namenode.local:8020/user/datastream/"),    
> ParquetAvroWriters./forReflectRecord/(/classOf/[Event])   )   .build()
>
>  
>
>  
>
> If I execute this in cluster I get the following error:
>
>  
>
> java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
>
> at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
>
> at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>
> at
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>  
>
>  
>
> Looks like there are some dependencies missing. How can I fix this?
>
>  
>
>  
>
> Jan O.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
> Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren
> oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht
> irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail
> oder unter der oben angegebenen Telefonnummer. 


signature.asc
Description: OpenPGP digital signature


回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
好的,感谢您的回复!



yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:
 
If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 
 
如回答有误,请指正。
 
 
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);
 
 
 
yinghua...@163.com
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
yinghua...@163.com


Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread 赵一旦
Evan说的这个是一个设置,但也仅影响cancel那个命令,stop还是会删除。这个点其实做的不是很好,不清楚为啥,之前Q过,没人鸟。。。
所以按照我的经验,如果是需要停止并基于保存点重启,那还好。如果计划基于检查点重启,无比提前备份检查点,然后停任务,然后复制备份回去。
在或者,直接cancel,不用stop。

Evan  于2021年1月14日周四 下午6:49写道:

> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
>  If you choose to retain externalized checkpoints on cancellation you have
> to handle checkpoint clean up manually when you cancel the job as well
> (terminating with job status JobStatus#CANCELED).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
>
> 如回答有误,请指正。
>
>
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:02
> 收件人: user-zh
> 主题: 回复: 回复: 请教个Flink checkpoint的问题
> 代码如下:
> streamEnv.enableCheckpointing(5 * 60 * 1000);
> CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
> checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
> checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
> checkPointConfig.setMaxConcurrentCheckpoints(1);
> checkPointConfig.setTolerableCheckpointFailureNumber(3);
> checkPointConfig
>
> .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
> try {
>   StateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointPath);
>   streamEnv.setStateBackend(rocksDBStateBackend);
>
>
>
> yinghua...@163.com
>
> 发件人: Evan
> 发送时间: 2021-01-14 17:55
> 收件人: user-zh
> 主题: 回复: 请教个Flink checkpoint的问题
> 代码图挂掉了,看不到代码
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> yinghua...@163.com
>


回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread Evan
是的,应该是机制问题,链接[1]打开有这样一句解释:

 If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 

如回答有误,请指正。





 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);



yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
 
 
yinghua...@163.com


Re: Re: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
感谢您的答复!



yinghua...@163.com
 
发件人: 赵一旦
发送时间: 2021-01-14 18:43
收件人: user-zh
主题: Re: Re: 请教个Flink checkpoint的问题
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink stop ${jobId} *不*保留 *注意别被特点坑*
通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} *不*保留 *注意别被特点坑 *
 
yinghua...@163.com  于2021年1月14日周四 下午6:23写道:
 
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory
> //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能
>
>
>
>
>
> yinghua...@163.com
>
> 发件人: tison
> 发送时间: 2021-01-14 18:04
> 收件人: user-zh
> 主题: Re: 请教个Flink checkpoint的问题
> 没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
> Best,
> tison.
>
>
> Evan  于2021年1月14日周四 下午5:56写道:
>
> > 代码图挂掉了,看不到代码
> >
> >
> >
> >
> > 发件人: yinghua...@163.com
> > 发送时间: 2021-01-14 17:26
> > 收件人: user-zh
> > 主题: 请教个Flink checkpoint的问题
> >
> >
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
> >
> >
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> >
> >
> > yinghua...@163.com
> >
>


Re: Re: 请教个Flink checkpoint的问题

2021-01-14 Thread 赵一旦
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink stop ${jobId} *不*保留 *注意别被特点坑*
通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} *不*保留 *注意别被特点坑 *

yinghua...@163.com  于2021年1月14日周四 下午6:23写道:

> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory
> //出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能
>
>
>
>
>
> yinghua...@163.com
>
> 发件人: tison
> 发送时间: 2021-01-14 18:04
> 收件人: user-zh
> 主题: Re: 请教个Flink checkpoint的问题
> 没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
> Best,
> tison.
>
>
> Evan  于2021年1月14日周四 下午5:56写道:
>
> > 代码图挂掉了,看不到代码
> >
> >
> >
> >
> > 发件人: yinghua...@163.com
> > 发送时间: 2021-01-14 17:26
> > 收件人: user-zh
> > 主题: 请教个Flink checkpoint的问题
> >
> >
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
> >
> >
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
> >
> >
> > yinghua...@163.com
> >
>


Re:Re: 请教个Flink checkpoint的问题

2021-01-14 Thread 邮件帮助中心
刚才又操作了一次,我重新截图了放在附件里了,
开始在18:29:29时没有看到chk-8生成,就是在18:29:29时checkpoint没有生成,
然后18:29:34查看时,checkpoint生成了
然后18:29:51查看时,checkpoint还在,此时我停止了那个任务
18:30:11去查看时,checkpoint的chk-8不见了














在 2021-01-14 18:04:27,"tison"  写道:
>没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
>
>Best,
>tison.
>
>
>Evan  于2021年1月14日周四 下午5:56写道:
>
>> 代码图挂掉了,看不到代码
>>
>>
>>
>>
>> 发件人: yinghua...@163.com
>> 发送时间: 2021-01-14 17:26
>> 收件人: user-zh
>> 主题: 请教个Flink checkpoint的问题
>>
>> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>>
>> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>>
>>
>> yinghua...@163.com
>>


Re: Re:flink-sql字段类型问题

2021-01-14 Thread yinghua...@163.com

回复错了,抱歉!


yinghua...@163.com
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:16
收件人: user-zh
主题: Re: 转发:flink-sql字段类型问题
 
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
Found 1 items
-rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
   // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
  //我停止任务后再去查询时,这个目录已经删除了,出错如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
ls: 
`hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
 No such file or directory//出错信息
 
 
 
yinghua...@163.com
发件人: 郝文强
发送时间: 2021-01-14 17:24
收件人: user-zh
主题: 转发:flink-sql字段类型问题
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
- 转发邮件信息 -
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:23
发送至: d...@flink.apache.org 
主题: 转发:flink-sql字段类型问题
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
- 转发邮件信息 -
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:22
发送至: dev-h...@flink.apache.org 
主题: flink-sql字段类型问题
sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
麻烦各位帮看一下
源数据表是 mysql的information_schema.tables 表
表结构如下:
table_catalog varchar(64)
table_schema  varchar(64)
table_name  varchar(64)
table_type  enum('base table','view','system view')
engine  varchar(64)
version int
row_format  enum('fixed','dynamic','compressed','redundant','compact','paged')
table_rows  bigint unsigned
avg_row_length  bigint unsigned
data_length bigint unsigned
max_data_length bigint unsigned
index_length  bigint unsigned
data_free bigint unsigned
auto_increment  bigint unsigned
create_time timestamp
update_time datetime
check_time  datetime
table_collation varchar(64)
checksum  bigint
create_options  varchar(256)
table_comment text
我的flink sql 建表语句:
   CREATE TABLE info_table (
  TABLE_CATALOG STRING,
  TABLE_SCHEMA STRING,
  TABLE_NAME STRING,
  TABLE_TYPE STRING,
  ENGINE STRING,
  VERSION INT,
  ROW_FORMAT STRING,
  TABLE_ROWS BIGINT,
  AVG_ROW_LENGTH BIGINT,
  DATA_LENGTH BIGINT,
  MAX_DATA_LENGTH BIGINT,
  INDEX_LENGTH BIGINT,
  DATA_FREE BIGINT,
  AUTO_INCREMENT BIGINT,
  CREATE_TIME TIMESTAMP,
  UPDATE_TIME TIMESTAMP,
  CHECK_TIME TIMESTAMP,
  TABLE_COLLATION STRING,
  CHECKSUM INTEGER,
  CREATE_OPTIONS STRING,
  TABLE_COMMENT STRING,
  PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/information_schema',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'TABLES'
);
反复改了几次类型都报错: 
java.math.BigInteger cannot be cast to java.lang.Integer
java.lang.Long cannot be cast to java.math.BigDecimal
java.lang.Long cannot be cast to java.lang.Integer
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制


Re: Re: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
Found 1 items
-rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
   // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
  //我停止任务后再去查询时,这个目录已经删除了,出错如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
ls: 
`hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
 No such file or directory
//出错信息,checkpoint信息被删除了,这个目录我是专门用来存放checkpoint信息,排除其他主动删除该文件的可能





yinghua...@163.com
 
发件人: tison
发送时间: 2021-01-14 18:04
收件人: user-zh
主题: Re: 请教个Flink checkpoint的问题
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。
 
Best,
tison.
 
 
Evan  于2021年1月14日周四 下午5:56写道:
 
> 代码图挂掉了,看不到代码
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> yinghua...@163.com
>


Re: 转发:flink-sql字段类型问题

2021-01-14 Thread yinghua...@163.com

[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop library
Found 1 items
-rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
   // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls 
hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
  //我停止任务后再去查询时,这个目录已经删除了,出错如下
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; 
support was removed in 8.0
log4j:WARN No such property [datePattern] in 
org.apache.log4j.RollingFileAppender.
21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
ls: 
`hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
 No such file or directory//出错信息



yinghua...@163.com
 
发件人: 郝文强
发送时间: 2021-01-14 17:24
收件人: user-zh
主题: 转发:flink-sql字段类型问题
 
 
 
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 
 
 
- 转发邮件信息 -
 
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:23
发送至: d...@flink.apache.org 
主题: 转发:flink-sql字段类型问题
 
 
 
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 
 
 
- 转发邮件信息 -
 
发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:22
发送至: dev-h...@flink.apache.org 
主题: flink-sql字段类型问题
sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
麻烦各位帮看一下
 
 
源数据表是 mysql的information_schema.tables 表
表结构如下:
table_catalog varchar(64)
table_schema  varchar(64)
table_name  varchar(64)
table_type  enum('base table','view','system view')
engine  varchar(64)
version int
row_format  enum('fixed','dynamic','compressed','redundant','compact','paged')
table_rows  bigint unsigned
avg_row_length  bigint unsigned
data_length bigint unsigned
max_data_length bigint unsigned
index_length  bigint unsigned
data_free bigint unsigned
auto_increment  bigint unsigned
create_time timestamp
update_time datetime
check_time  datetime
table_collation varchar(64)
checksum  bigint
create_options  varchar(256)
table_comment text
我的flink sql 建表语句:
   CREATE TABLE info_table (
  TABLE_CATALOG STRING,
  TABLE_SCHEMA STRING,
  TABLE_NAME STRING,
  TABLE_TYPE STRING,
  ENGINE STRING,
  VERSION INT,
  ROW_FORMAT STRING,
  TABLE_ROWS BIGINT,
  AVG_ROW_LENGTH BIGINT,
  DATA_LENGTH BIGINT,
  MAX_DATA_LENGTH BIGINT,
  INDEX_LENGTH BIGINT,
  DATA_FREE BIGINT,
  AUTO_INCREMENT BIGINT,
  CREATE_TIME TIMESTAMP,
  UPDATE_TIME TIMESTAMP,
  CHECK_TIME TIMESTAMP,
  TABLE_COLLATION STRING,
  CHECKSUM INTEGER,
  CREATE_OPTIONS STRING,
  TABLE_COMMENT STRING,
  PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/information_schema',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'TABLES'
);
 
 
反复改了几次类型都报错: 
 
java.math.BigInteger cannot be cast to java.lang.Integer
 
java.lang.Long cannot be cast to java.math.BigDecimal
 
java.lang.Long cannot be cast to java.lang.Integer
 
| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制
 


Re: 请教个Flink checkpoint的问题

2021-01-14 Thread tison
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。

Best,
tison.


Evan  于2021年1月14日周四 下午5:56写道:

> 代码图挂掉了,看不到代码
>
>
>
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 17:26
> 收件人: user-zh
> 主题: 请教个Flink checkpoint的问题
>
> 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
>
> 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
>
>
> yinghua...@163.com
>


回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);



yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
 
 
yinghua...@163.com


回复: 请教个Flink checkpoint的问题

2021-01-14 Thread Evan
代码图挂掉了,看不到代码



 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


yinghua...@163.com


回复: 请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
刚才代码截图没发出去,再贴下代码
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);



yinghua...@163.com
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


yinghua...@163.com


Deterministic rescale for test

2021-01-14 Thread Martin Frank Hansen
Hi,

I am trying to make a test-suite for our Flink jobs, and are having
problems making the input-data deterministic.

We are reading a file-input with parallelism 1 and want to rescale to a
higher parallelism, such that the ordering of the data is the same every
time.

I have tried using rebalance, rescale but it seems to randomly distribute
data between partitions. We don't need something optimized, we just need
the same distribution for every run.
Is this possible?

Some code:

val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)

rawStream.rescale
...

best regards

-- 

Martin Frank Hansen


请教个Flink checkpoint的问题

2021-01-14 Thread yinghua...@163.com
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?


yinghua...@163.com


回复:flink1.12 k8s session部署,TM无法启动

2021-01-14 Thread superainbower
大佬,可否提供一下你那边flink native 方式 k8s部署的测试文档地址

在2021年01月14日 15:12,Yang Wang 写道:
这个问题是在1.12.1中修复的,1.12.0里面还不能支持给TM设置ServiceAccount
具体可以看下这个ticket,https://issues.apache.org/jira/browse/FLINK-20664

另外,1.12.1正在投票,最近就会发布

Best,
Yang

1120344670 <1120344...@qq.com> 于2021年1月13日周三 下午5:17写道:

> *flink版本: 1.12*
> *kubernetes:  1.17*
> TM无法启动,  报错如下:
>
>
> 从报错来看,TM尝试访问统一namespace下的configmap出现了权限问题, 使用的是 system:serviceaccount:
> flink-test:default 这个角色。   在启动flink的时候我已经设置了 "taskmanager.service-account" ,
> "jobmanager.service-account", "kubernetes.service-account"
> 这三个参数都是我们自定的service account. 看起来是没有生效或者可用。
>
> 处理: 为default账号创建一个可以查看的角色即可。
>
> kubectl create clusterrolebinding flink-role-binding-flink-defalut 
> --clusterrole=edit --serviceaccount=namespace:service-account.
>
>
>
>


转发:flink-sql字段类型问题

2021-01-14 Thread 郝文强




| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



- 转发邮件信息 -

发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:23
发送至: d...@flink.apache.org 
主题: 转发:flink-sql字段类型问题




| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



- 转发邮件信息 -

发件人: 郝文强 <18846086...@163.com>
发送日期: 2021年01月14日 17:22
发送至: dev-h...@flink.apache.org 
主题: flink-sql字段类型问题
sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
麻烦各位帮看一下


源数据表是 mysql的information_schema.tables 表
表结构如下:
table_catalog varchar(64)
table_schema  varchar(64)
table_name  varchar(64)
table_type  enum('base table','view','system view')
engine  varchar(64)
version int
row_format  enum('fixed','dynamic','compressed','redundant','compact','paged')
table_rows  bigint unsigned
avg_row_length  bigint unsigned
data_length bigint unsigned
max_data_length bigint unsigned
index_length  bigint unsigned
data_free bigint unsigned
auto_increment  bigint unsigned
create_time timestamp
update_time datetime
check_time  datetime
table_collation varchar(64)
checksum  bigint
create_options  varchar(256)
table_comment text
我的flink sql 建表语句:
   CREATE TABLE info_table (
  TABLE_CATALOG STRING,
  TABLE_SCHEMA STRING,
  TABLE_NAME STRING,
  TABLE_TYPE STRING,
  ENGINE STRING,
  VERSION INT,
  ROW_FORMAT STRING,
  TABLE_ROWS BIGINT,
  AVG_ROW_LENGTH BIGINT,
  DATA_LENGTH BIGINT,
  MAX_DATA_LENGTH BIGINT,
  INDEX_LENGTH BIGINT,
  DATA_FREE BIGINT,
  AUTO_INCREMENT BIGINT,
  CREATE_TIME TIMESTAMP,
  UPDATE_TIME TIMESTAMP,
  CHECK_TIME TIMESTAMP,
  TABLE_COLLATION STRING,
  CHECKSUM INTEGER,
  CREATE_OPTIONS STRING,
  TABLE_COMMENT STRING,
  PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/information_schema',
  'username' = 'root',
  'password' = 'root',
  'table-name' = 'TABLES'
);


反复改了几次类型都报错: 

java.math.BigInteger cannot be cast to java.lang.Integer

java.lang.Long cannot be cast to java.math.BigDecimal

java.lang.Long cannot be cast to java.lang.Integer

| |
郝文强
|
|
18846086...@163.com
|
签名由网易邮箱大师定制



Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-14 Thread Arvid Heise
Hi Avi,

apparently the maximum version that Flink supports for scala is 2.12.7 [1].
Do you have a specific reason to use a higher version?

[1] https://issues.apache.org/jira/browse/FLINK-12461

On Thu, Jan 14, 2021 at 5:11 AM Avi Levi  wrote:

> Hi Arvid,
> Please find attached full build.gradle file
>
> On Tue, Jan 12, 2021 at 8:18 PM Arvid Heise  wrote:
>
>> Can you post the full dependencies of sbt/maven/gradle whatever?
>>
>> On Tue, Jan 12, 2021 at 3:54 AM Avi Levi  wrote:
>>
>>> Hi Arvid,
>>> using :
>>>
>>> flinkVersion = '1.12.0'
>>> scalaBinaryVersion = '2.12'
>>>
>>> I simplified the example to (same exception)  :
>>>
>>> object Flinktest extends App {
>>>   private val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.fromElements("A", "B","c")
>>> .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
>>> .process{new ProcessAllWindowFunction[String, List[String], TimeWindow] 
>>> {
>>>   override def process(context: Context, elements: Iterable[String], 
>>> out: Collector[List[String]]): Unit = {
>>> out.collect(elements.toList)
>>>   }
>>> }
>>> }
>>> .print()
>>>
>>> env.execute("Sample")
>>> }
>>>
>>>
>>>
>>>
>>> On Tue, Jan 5, 2021 at 1:53 PM Arvid Heise  wrote:
>>>
 Hi Avi,

 without being a scala-guy, I'm guessing that you are mixing scala
 versions. Could you check that your user code uses the same scala version
 as Flink (1.11 or 1.12)? I have also heard of issues with different minor
 versions of scala, so make sure to use the exact same version (e.g.
 2.11.12).

 On Mon, Dec 28, 2020 at 3:54 PM Avi Levi  wrote:

> I am trying to aggregate all records in a time window. This is my
> ProcessAllWindowFunction :
>
> case class SimpleAggregate(elms: List[String])
>
> class AggregateLogs extends ProcessAllWindowFunction[String, 
> SimpleAggregate, TimeWindow ] {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[SimpleAggregate]): Unit = {
> val es: List[String] = elements.toList
> val record = SimpleAggregate(es)
> out.collect(record)
>   }
> }
>
> But I am getting this exception why ?
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> scala.tools.reflect.ToolBoxError: reflective compilation has failed: 
> cannot
> initialize the compiler due to java.lang.BootstrapMethodError:
> java.lang.NoSuchMethodError:
> scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2$$anon$3.(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.$anonfun$createSerializer$1(HandleFinancialJob.scala:52)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.createSerializer(HandleFinancialJob.scala:52)
> at
> 

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 Thread Yun Tang
Hi,

你本地的数据肯定是过期了,checkpoint 
size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full 
snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。

其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。

建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#cleanup-in-full-snapshot

祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 16:11
To: user-zh@flink.apache.org 
Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化

你好:
 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。

> 在 2021年1月14日,下午4:07,Yun Tang  写道:
>
> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
>
> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
>
>
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: Flink sql 状态过期后,checkpoint 大小没变化
>
> 大家好:
>版本:1.12.0
>方式:flink sql
>测试sql:
>select a.id,b.money,b.createTime from test_state_from a
>full join test_state_from1 b on a.id=b.id;
>问题:
>   test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?



Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread Yun Gao
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, 
and a stable method I think may be try adding the FlinkKafkaConsumer as a 
BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

--
Sender:Yun Gao
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha; 
sagar
Cc:Flink User Mail List
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 Thread datayangl
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
FlinkUtils.initTable()
val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
streamEnv.disableOperatorChaining()
streamEnv.setParallelism(1)
streamEnv.setMaxParallelism(1)
CheckPointUtils.setCheckPoint(streamEnv, 12, 6)
dealWithOdsDataTohive(tableEnv)
 val sqls:Map[String,String] = ConfigItem.ODS_SQL

val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet

val filledAllSqlsTable = sqls.map(x=>{
  val hiveMapTopic = hiveTableMapTopic
  val topicName = hiveMapTopic.getOrElse(x._1,null)
  val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
  (x._1,topic,x._2)
}).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
  val sql = fillTemplate(x._1,x._2,x._3)
  tableEnv.executeSql(sql)
  x._1
})
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
val stmtSet = tableEnv.createStatementSet()
val allInsertSqls = filledAllSqlsTable.map(table=>{
  s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
}).toList
allInsertSqls.foreach(x=>{
  stmtSet.addInsertSql(x)
})
val insertTaskStatus = stmtSet.execute()
//insertTaskStatus.print()
println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"

filled
  }

执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)

报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。


参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 Thread 孙啸龙
你好:
 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。

> 在 2021年1月14日,下午4:07,Yun Tang  写道:
> 
> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
> 
> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
> 
> 
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: Flink sql 状态过期后,checkpoint 大小没变化
> 
> 大家好:
>版本:1.12.0
>方式:flink sql
>测试sql:
>select a.id,b.money,b.createTime from test_state_from a
>full join test_state_from1 b on a.id=b.id;
>问题:
>   test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?



回复: flink sql读kafka元数据问题

2021-01-14 Thread 酷酷的浑蛋
官网没说在哪里读key啊




在2021年01月14日 14:52,Jark Wu 写道:
kafka 读 key fields:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields

On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote:

hi

你写入数据的时候设置 headers 了吗 没设置的话当然是空的了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 Thread Yun Tang
使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?

在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?


祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 15:52
To: user-zh@flink.apache.org 
Subject: Flink sql 状态过期后,checkpoint 大小没变化

大家好:
版本:1.12.0
方式:flink sql
测试sql:
select a.id,b.money,b.createTime from test_state_from a
full join test_state_from1 b on a.id=b.id;
问题:
   test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?


回复: flink sql读kafka元数据问题

2021-01-14 Thread 酷酷的浑蛋






在2021年01月14日 16:03,酷酷的浑蛋 写道:
你意思是说,topic不是flink写入的,用flink sql就不能读到key?




在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi

你写入数据的时候设置 headers 了吗 没设置的话当然是空的了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: flink sql读kafka元数据问题

2021-01-14 Thread 酷酷的浑蛋
你意思是说,topic不是flink写入的,用flink sql就不能读到key?




在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi

你写入数据的时候设置 headers 了吗 没设置的话当然是空的了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/