Re: Issues with Flink Batch and Hadoop dependency

2020-08-29 Thread Dan Hill
I was able to get a basic version to work by including a bunch of hadoop
and s3 dependencies in the job jar and hacking in some hadoop config
values.  It's probably not optimal but it looks like I'm unblocked.

On Fri, Aug 28, 2020 at 12:11 PM Dan Hill  wrote:

> I'm assuming I have a simple, common setup problem.  I've spent 6 hours
> debugging and haven't been able to figure it out.  Any help would be
> greatly appreciated.
>
>
> *Problem*
> I have a Flink Streaming job setup that writes SequenceFiles in S3.  When
> I try to create a Flink Batch job to read these Sequence files, I get the
> following error:
>
> NoClassDefFoundError: org/apache/hadoop/mapred/FileInputFormat
>
> It fails on this readSequenceFile.
>
> env.createInput(HadoopInputs.readSequenceFile(Text.class,
> ByteWritable.class, INPUT_FILE))
>
> If I directly depend on org-apache-hadoop/hadoop-mapred when building the
> job, I get the following error when trying to run the job:
>
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
> at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:209)
> at
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:48)
> at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:254)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150)
> at
> org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58)
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:257)
>
>
> *Extra context*
> I'm using this Helm chart 
> for creating Flink.  I'm using v1.10.1.
>
>
> *Questions*
> Are there any existing projects that read batch Hadoop file formats from
> S3?
>
> I've looked at these instructions for Hadoop Integration
> .
> I'm assuming my configuration is wrong.  I'm also assuming I need the
> hadoop dependency properly setup in the jobmanager and taskmanager (not in
> the job itself).  If I use this Helm chart, do I need to download a hadoop
> common jar into the Flink images for jobmanager and taskmanager?  Are there
> pre-built images which I can use that already have the dependencies setup?
>
>
> - Dan
>


Re: Flink not outputting windows before all data is seen

2020-08-29 Thread David Anderson
Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:

> Hey!
>
> Second time posting to a mailing lists, lets hope I'm doing this
> correctly :)
>
> My usecase is to take data from the mediawiki dumps and stream it into
> Flink via the `readTextFile` method. The dumps are TSV files with an
> event per line, each event have a timestamp and a type. I want to use
> event time processing and simply print out how many of each event type
> there is per hour. The data can be out of order, so I have 1 hour
> tolerance.
>
> What I expect to happen here is that as it goes through a month of data,
> it will print out the hours as the watermark passes 1 hour. So I'll get
> output continuously until the end.
>
> What really happens is that the program outputs nothing until it is done
> and then it outputs everything. The timestamp is also stuck at
> 9223372036854776000 in the web management. If I switch to using
> CountWindows instead of timewindows, it outputs continuously like I
> would expect it too, so it seems to be watermark related.
>
> I'm running Flink version 1.11.1 on JVM version:
>
> OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02
>
> The parallel setting is 1 and it's running on my laptop.
>
>
> I don't know how much code I'm allowed to attach here, so I've created a
> github repo with the complete self standing example [1]. To get the data
> used, run the following commands:
>
> $ wget
> https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
> $ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
> |  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv
>
> If you don't have pv installed, just remove that part, I just like to
> have an overview.
>
>
> The main code part is this:
>
> package org.example.prow;
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.example.prow.wikimedia.Event;
>
> import java.time.Duration;
>
> public class App {
>  public static void main(String[] args) throws Exception {
>  final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>  final String fileNameInput =
> "file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
>  final DataStream linesIn =
> env.readTextFile(fileNameInput);
>
>
>  final SingleOutputStreamOperator jj = linesIn.map(value ->
> new Event(value));
>
>  final WatermarkStrategy mew =
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
> recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);
>
>  final DataStream props =
> jj.assignTimestampsAndWatermarks(mew);
>
>  final KeyedStream praps = props.keyBy(e ->
> e.eventEntity.toString());
>
>
>  
> praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");
>
>  env.execute("FlinkWikipediaHistoryTopEditors");
>  }
> }
>
> If you see any erors here, please tell me, this is sort of driving me
> mad >_<.
>
> Best regards,
> Teodor Spæren
>
> [1] https://github.com/rHermes/flink-question-001
>


Re: FileSystemHaServices and BlobStore

2020-08-29 Thread Alexey Trenikhun
Did test with streaming job and FileSystemHaService using VoidBlobStore (no HA 
Blob), looks like job was able to recover from both JM restart and TM restart. 
Any idea in what use cases HA Blob is needed?

Thanks,
Alexey

From: Alexey Trenikhun 
Sent: Friday, August 28, 2020 11:31 AM
To: Khachatryan Roman 
Cc: Flink User Mail List 
Subject: Re: FileSystemHaServices and BlobStore

Motivation is to have k8s HA setup without extra component - Zookeeper, see [1]

Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks like  
if we start job from savepoint, then persistence of BlobStore is not necessary, 
but is it needed if we recover from checkpoint?

Thanks,
Alexey

[1]. https://issues.apache.org/jira/browse/FLINK-17598



From: Khachatryan Roman 
Sent: Friday, August 28, 2020 9:24 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: FileSystemHaServices and BlobStore

Hello Alexey,

I think you need FileSystemBlobStore as you are implementing HA Services, and 
BLOBs should be highly available too.
However, I'm a bit concerned about the direction in general: it essentially 
means re-implementing ZK functionality on top of FS.
What are the motivation and the use case?

Regards,
Roman


On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I'm thinking about implementing FileSystemHaServices - single leader, but 
persistent RunningJobRegistry, CheckpointIDCounter, CompletedCheckpointStore 
and JobGraphStore. I'm not sure do you need FileSystemBlobStore or 
VoidBlobStore is enough. Can't figure out, should BlobStore survive JobManager 
crash. I see that ZookeeperHaServices use FileSystemBlobStore, but not clear is 
to due to having multiple JobManagers (leader + follower) or necessity to 
preserve BLOBs on restart.

Thanks,
Alexey


Re: flink1.11时间函数

2020-08-29 Thread Leonard Xu
补充下哈,
可能是function这个词翻译后理解问题,功能没有确定性/不确定性这一说法,那个文档里的function都应理解为函数,note里讲的是函数的返回值是确定性的还是不确定性。

祝好
Leonard

> 在 2020年8月29日,18:22,Dream-底限  写道:
> 
> 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的
> 
> Benchao Li  于2020年8月28日周五 下午8:01写道:
> 
>> 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
>> 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
>> 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
>> 
>> Dream-底限  于2020年8月28日周五 下午2:50写道:
>> 
>>> hi
>>> 
>>> UNIX_TIMESTAMP()
>>> 
>>> NOW()
>>> 
>>> 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
>>> 
>> 
>> 
>> --
>> 
>> Best,
>> Benchao Li
>> 



Re: flink sql 计算列不支持comment

2020-08-29 Thread Leonard Xu
Hi, sllence

这是个bug, 看起来是支持计算列时漏掉了comment的解析,我开了个issue去修复[1].

祝好
Leonard

[1] https://issues.apache.org/jira/browse/FLINK-19092 


> 在 2020年8月29日,13:37,  
> 写道:
> 
> Flink版本:1.11.1
> 
> 
> 
> 官网文档中定义如下:
> 
> :
> 
>  column_name AS computed_column_expression [COMMENT column_comment]
> 
> 我看到官方文档中计算列是可以指定column_comment的,但我尝试了一下发现会报错
> 
> 
> 
> 使用方式和报错信息如下:
> 
> create table t1( 
> 
>   data_time STRING, 
> 
>   row1_time AS to_timestamp(data_time) COMMENT 'test'
> 
>   WATERMARK FOR row1_time AS row1_time - INTERVAL '5' SECOND
> 
> ) with(...)
> 
> 
> 
> org.apache.flink.sql.parser.impl.ParseException: Encountered "COMMENT" at
> line 1, column 74.
> 
> Was expecting one of:
> 
>"FILTER" ...
> 
>"OVER" ...
> 
>"WITHIN" ...
> 
>")" ...
> 
>"," ...
> 
>"." ...
> 
>"NOT" ...
> 
>"IN" ...
> 
>"<" ...
> 
>"<=" ...
> 
>">" ...
> 
>">=" ...
> 
>"=" ...
> 
>"<>" ...
> 
>"!=" ...
> 
>"BETWEEN" ...
> 
>"LIKE" ...
> 
>"SIMILAR" ...
> 
>"+" ...
> 
>"-" ...
> 
>"*" ...
> 
>"/" ...
> 
>"%" ...
> 
>"||" ...
> 
>"AND" ...
> 
>"OR" ...
> 
>"IS" ...
> 
>"MEMBER" ...
> 
>"SUBMULTISET" ...
> 
>"CONTAINS" ...
> 
>"OVERLAPS" ...
> 
>"EQUALS" ...
> 
>"PRECEDES" ...
> 
>"SUCCEEDS" ...
> 
>"IMMEDIATELY" ...
> 
>"MULTISET" ...
> 
>"[" ...
> 
>"FORMAT" ...
> 
>"IGNORE" ...
> 
>"RESPECT" ...
> 
> 
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(F
> linkSqlParserImpl.java:36086)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSq
> lParserImpl.java:35900)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlP
> arserImpl.java:5271)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkS
> qlParserImpl.java:6269)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParser
> Impl.java:19047)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserIm
> pl.java:3308)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlPars
> erImpl.java:2775)
> 
>   at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSq
> lParserImpl.java:252)
> 
>   at
> org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:201)
> 



Flink not outputting windows before all data is seen

2020-08-29 Thread Teodor Spæren

Hey!

Second time posting to a mailing lists, lets hope I'm doing this 
correctly :)


My usecase is to take data from the mediawiki dumps and stream it into 
Flink via the `readTextFile` method. The dumps are TSV files with an 
event per line, each event have a timestamp and a type. I want to use 
event time processing and simply print out how many of each event type 
there is per hour. The data can be out of order, so I have 1 hour 
tolerance.


What I expect to happen here is that as it goes through a month of data, 
it will print out the hours as the watermark passes 1 hour. So I'll get 
output continuously until the end.


What really happens is that the program outputs nothing until it is done 
and then it outputs everything. The timestamp is also stuck at 
9223372036854776000 in the web management. If I switch to using 
CountWindows instead of timewindows, it outputs continuously like I 
would expect it too, so it seems to be watermark related.


I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a 
github repo with the complete self standing example [1]. To get the data 
used, run the following commands:


$ wget 
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat |  sort 
-k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to 
have an overview.



The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final String fileNameInput = 
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
final DataStream linesIn = env.readTextFile(fileNameInput);


final SingleOutputStreamOperator jj = linesIn.map(value -> new 
Event(value));

final WatermarkStrategy mew = 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
 recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

final DataStream props = jj.assignTimestampsAndWatermarks(mew);

final KeyedStream praps = props.keyBy(e -> 
e.eventEntity.toString());


praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

env.execute("FlinkWikipediaHistoryTopEditors");
}
}

If you see any erors here, please tell me, this is sort of driving me 
mad >_<.


Best regards,
Teodor Spæren

[1] https://github.com/rHermes/flink-question-001


Re: PyFlink cluster runtime issue

2020-08-29 Thread Manas Kale
Ok, thank you!

On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang,  wrote:

> Hi Manas,
>
> We can't submit a pyflink job through flink web currently. The only way
> currently to submit a pyFlink job is through the command line.
>
> Best,
> Xingbo
>
> Manas Kale  于2020年8月29日周六 下午12:51写道:
>
>> Hi Xingbo,
>> Thanks, that worked. Just to make sure, the only way currently to submit
>> a pyFlink job is through the command line right? Can I do that through the
>> GUI?
>>
>> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang  wrote:
>>
>>> Hi Manas,
>>>
>>> I think you forgot to add kafka jar[1] dependency. You can use the
>>> argument -j of the command line[2] or the Python Table API to specify the
>>> jar. For details about the APIs of adding Java dependency, you can refer to
>>> the relevant documentation[3]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年8月28日周五 下午9:06写道:
>>>
 Hi,
 I am trying to deploy a pyFlink application on a local cluster. I am
 able to run my application without any problems if I execute it as a normal
 python program using the command :
 python myApplication.py
 My pyFlink version is __version__ = "1.11.0".
 I had installed this pyFlink through conda/pip (don't remember which).

 Per instructions given in [1] I have ensured that running the command
 "python" gets me to a python 3.7 shell with pyFlink installed.
 I have also ensured my local Flink cluster version is 1.11.0 (same as
 above).
 However, if I execute the application using the command:
 bin/flink run -py myApplication.py

 I get the error:

 Traceback (most recent call last):
  File "basic_streaming_job.py", line 65, in 
main()
  File "basic_streaming_job.py", line 43, in main
""")
  File
 "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
 table_environment.py", line 543, in execute_sql
  File
 "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
 /java_gateway.py", line 1286, in __call__
  File
 "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
 xceptions.py", line 147, in deco
  File
 "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
 /protocol.py", line 328, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o5.executeSql.
 : org.apache.flink.table.api.ValidationException: Unable to create a
 source for reading table
 'default_catalog.default_database.raw_message'.

 Table options are:

 'connector'='kafka'
 'format'='json'
 'properties.bootstrap.servers'='localhost:9092'
 'topic'='basic_features_normalized'
at
 org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
 5)
at
 org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
 ogSourceTable.scala:135)
 .

 The offending table schema in question :

 CREATE TABLE {INPUT_TABLE} (
 monitorId STRING,
 deviceId STRING,
 state INT,
 feature_1 DOUBLE,
 feature_2 DOUBLE,
 feature_3 DOUBLE,
 feature_4 DOUBLE,
 feature_5 DOUBLE,
 feature_6 DOUBLE,
 time_str TIMESTAMP(3),
 WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' 
 SECOND
 ) WITH (
 'connector' = 'kafka',
 'topic' = '{INPUT_TOPIC}',
 'properties.bootstrap.servers' = '{KAFKA}',
 'format' = 'json'
 )

 Clearly, even though my standalone pyFlink version and cluster Flink
 versions are the same, something is different with the cluster runtime.
 What could that be?


 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples

>>>


Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-29 Thread Andrey Zagrebin
Hi Vijay,

I would apply the same judgement. It is latency vs throughput vs spent
resources vs practical need.

The more concurrent checkpoints your system is capable of handling, the
better end-to-end result latency you will observe and see computation
results more frequently.
On the other hand your system needs to provide more resources (maybe higher
parallelism) to process more current checkpoints.

Again lees the checkpoints -> more records are batched together and the
throughput is better.

It usually does not make sense to have a big number of current checkpoints
which process only a handful of records in between if you do not observe
any practical decrease of latency.
The system will just waste resources to process the checkpoints.

Best,
Andrey

On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav 
wrote:

> Hi Andrey,
>
> Thanks,
> what is recommendation for :  env.getCheckpointConfig.
> *setMaxConcurrentCheckpoints*(concurrentchckpt) ?
>
> 1 or higher based on what factor.
>
>
> Regards,
> Vijay
>
>
> On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin 
> wrote:
>
>> Hi Vijay,
>>
>> I think it depends on your job requirements, in particular how many
>> records are processed per second and how much resources you have to process
>> them.
>>
>> If the checkpointing interval is short then the checkpointing overhead
>> can be too high and you need more resources to efficiently keep up with the
>> incoming streaming.
>>
>> If the checkpointing interval is long, more records are batched together
>> and the throughput is better.
>> On the other hand, the observed latency is lower because the batched
>> results get flushed into the files and become visible in the external
>> system only when checkpoint occurs to provide exactly once guarantee.
>>
>> Best,
>> Andrey
>>
>> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls
>>> (ONLY) on every checkpoint.
>>>
>>> *.withRollingPolicy(OnCheckpointRollingPolicy.build())*
>>>
>>> Question: What are recommended values related to checkpointing to
>>> fsstate, should it be more frequent checkpoints, or longer intervals, how
>>> many concurrent checkpoints needs to be allowed, how much should be an
>>> ideal pause between each checkpoint.
>>> Is there a way to control batch size here other than time ? any
>>> recommendations to all the parameters listed below?
>>> *Note: *I am trying to improve sink throughput.
>>>
>>>
>>> env.enableCheckpointing(chckptintervalmilli)
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>>>
>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
>>> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>>>
>>>  
>>> env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>>>
>>> Thanks,
>>> Vijay
>>>
>>


Re: PyFlink cluster runtime issue

2020-08-29 Thread Xingbo Huang
Hi Manas,

We can't submit a pyflink job through flink web currently. The only way
currently to submit a pyFlink job is through the command line.

Best,
Xingbo

Manas Kale  于2020年8月29日周六 下午12:51写道:

> Hi Xingbo,
> Thanks, that worked. Just to make sure, the only way currently to submit a
> pyFlink job is through the command line right? Can I do that through the
> GUI?
>
> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang  wrote:
>
>> Hi Manas,
>>
>> I think you forgot to add kafka jar[1] dependency. You can use the
>> argument -j of the command line[2] or the Python Table API to specify the
>> jar. For details about the APIs of adding Java dependency, you can refer to
>> the relevant documentation[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年8月28日周五 下午9:06写道:
>>
>>> Hi,
>>> I am trying to deploy a pyFlink application on a local cluster. I am
>>> able to run my application without any problems if I execute it as a normal
>>> python program using the command :
>>> python myApplication.py
>>> My pyFlink version is __version__ = "1.11.0".
>>> I had installed this pyFlink through conda/pip (don't remember which).
>>>
>>> Per instructions given in [1] I have ensured that running the command
>>> "python" gets me to a python 3.7 shell with pyFlink installed.
>>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>>> above).
>>> However, if I execute the application using the command:
>>> bin/flink run -py myApplication.py
>>>
>>> I get the error:
>>>
>>> Traceback (most recent call last):
>>>  File "basic_streaming_job.py", line 65, in 
>>>main()
>>>  File "basic_streaming_job.py", line 43, in main
>>>""")
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>>> table_environment.py", line 543, in execute_sql
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>> /java_gateway.py", line 1286, in __call__
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>>> xceptions.py", line 147, in deco
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>> /protocol.py", line 328, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o5.executeSql.
>>> : org.apache.flink.table.api.ValidationException: Unable to create a
>>> source for reading table
>>> 'default_catalog.default_database.raw_message'.
>>>
>>> Table options are:
>>>
>>> 'connector'='kafka'
>>> 'format'='json'
>>> 'properties.bootstrap.servers'='localhost:9092'
>>> 'topic'='basic_features_normalized'
>>>at
>>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>>> 5)
>>>at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>>> ogSourceTable.scala:135)
>>> .
>>>
>>> The offending table schema in question :
>>>
>>> CREATE TABLE {INPUT_TABLE} (
>>> monitorId STRING,
>>> deviceId STRING,
>>> state INT,
>>> feature_1 DOUBLE,
>>> feature_2 DOUBLE,
>>> feature_3 DOUBLE,
>>> feature_4 DOUBLE,
>>> feature_5 DOUBLE,
>>> feature_6 DOUBLE,
>>> time_str TIMESTAMP(3),
>>> WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = '{INPUT_TOPIC}',
>>> 'properties.bootstrap.servers' = '{KAFKA}',
>>> 'format' = 'json'
>>> )
>>>
>>> Clearly, even though my standalone pyFlink version and cluster Flink
>>> versions are the same, something is different with the cluster runtime.
>>> What could that be?
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>>
>>


Re: flink1.11时间函数

2020-08-29 Thread Dream-底限
哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的

Benchao Li  于2020年8月28日周五 下午8:01写道:

> 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
> 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
> 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
>
> Dream-底限  于2020年8月28日周五 下午2:50写道:
>
> > hi
> >
> > UNIX_TIMESTAMP()
> >
> > NOW()
> >
> > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
> >
>
>
> --
>
> Best,
> Benchao Li
>


??Flink??????????????

2020-08-29 Thread ????????
hi,all:


??demoflink?
.

Re: 如何设置FlinkSQL并行度

2020-08-29 Thread zilong xiao
SQL 算子并行度设置可以自己实现,可以私下交流下,正好在做这块,基本能工作了

JasonLee <17610775...@163.com> 于2020年8月23日周日 下午2:07写道:

> hi
> checkpoint savepoint的问题可以看下这个
> https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>