The flink stateful function Python API looks cool, but is there a
documented spec for how it communicates with Flink? I'd like to implement
an SDK in Haskell if I can.
Hi Robert!
I have Flink running locally on minikube. I'm running SQL client using
exec on the jobmanager.
kubectl exec pod/flink-jobmanager-0 -i -t -- /opt/flink/bin/sql-client.sh
embedded -e /opt/flink/sql-client-defaults.yaml
Here's the sql-client-defaults.yaml. I didn't specify a session.
Hi Flink Community,
I was trying to submit a flink job on a standalone cluster
using RestClusterClient. After waiting for job submission, I got JobID
correctly and tried to delete the source jar file. But then I got
the exception:
java.nio.file.FileSystemException: /path/to/jar: Процесс не
Hi Averell,
as far as I know these tmp files should be removed when the Flink job is
recovering. So you should have these files around only for the latest
incomplete checkpoint while recovery has not completed yet.
On Tue, Sep 1, 2020 at 2:56 AM Averell wrote:
> Hello Robert, Arvid,
>
> As I am
Hi Dan,
the notation of "flink-jobmanager/10.98.253.58:8081" is not a problem. It
is how java.net.InetAddress stringifies a resolved address (with both
hostname and IP).
How did you configure the SQL client to work with a Kubernetes Session?
Afaik this is not a documented, tested and officially
Hi Vijay,
Can you post the error you are referring to?
Did you properly set up an s3 plugin (
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/) ?
On Fri, Sep 11, 2020 at 8:42 AM Vijay Balakrishnan
wrote:
> Hi,
>
> I want to *get data from S3 and process and send to
Hi Prashant,
My initial suspicion is that this is a problem in the UI or with the
network connection from the browser to the Flink REST endpoints.
Since you can access the metrics with "curl", Flink seems to do
everything all right.
The first URL you posted is for the watermarks (it ends with
We are running a Flink 1.11.0 job cluster on Kubernetes. We're not seeing
any metrics in the Flink Web UI (for the default metrics like Bytes
Received, Records Received, etc.), instead we see a spinner. See image
below.
However, we have a prometheus metrics exporter configured and see job/task
Hello Ayush,
I am interesting in knowing about your “really simple” implementation.
So assuming the streaming parquet output goes to S3 bucket: Initial
(partitioned by event time)
Do you write another Flink batch application (step 2) which partitions the data
from Initial in larger “event
Many thanks.
This is great to hear.
Yes! This looks great.
Many Thanks!
Best,
Georg
Am Do., 10. Sept. 2020 um 23:53 Uhr schrieb Dian Fu :
> Hi Georg,
>
> It still doesn't support state access in Python API in the latest version
> 1.11.
>
> Could you take a look at if KeyedProcessFunction
Hi,
Looking at the problem broadly, file size is directly tied up with how
often you commit. No matter which system you use, this variable will always
be there. If you commit frequently, you will be close to realtime, but you
will have numerous small files. If you commit after long intervals, you
Hi Theo,
I think you're right that there is currently no good built-in solution
for your use case. What you would ideally like to have is some operation
that can buffer records and "hold back" the watermark according to the
timestamps of the records that are in the buffer. This has the
Hi Piper,
I personally like looking at the system load (if Flink is the only major
process on the system). It nicely captures the "stress" Flink puts on the
system (this would be the "System.CPU.Load5min class of metrics") (there
are a lot of articles about understanding linux load averages)
I
Hi Marek,
what you are describing is a known problem in Flink. There are some
thoughts on how to address this in
https://issues.apache.org/jira/browse/FLINK-11499 and
https://issues.apache.org/jira/browse/FLINK-17505
Maybe some ideas there help you already for your current problem (use long
Hi Ken,
Some random ideas that pop up in my head:
- make sure you use data types that are efficient to serialize, and cheap
to compare (ideally use primitive types in TupleN or POJOs)
- Maybe try the TableAPI batch support (if you have time to experiment).
- optimize memory usage on the
Hi Pierre,
It seems that the community is working on providing a fix with the next
1.11 bugfix release (and for 1.12). You can follow the status of the ticket
here: https://issues.apache.org/jira/browse/FLINK-18934
Best,
Robert
On Thu, Sep 10, 2020 at 11:00 AM Pierre Bedoucha
wrote:
> Hi and
Hi Sunitha,
(Note: You've emailed both the dev@ and user@ mailing list. Please only use
the user@ mailing list for questions on how to use Flink. I'm moving the
dev@ list to bcc)
Flink does not have facilities for scheduling batch jobs, and there are no
plans to add such a feature (this is not
@Galen FYI: the upcoming StateFun release would use Scala2.12
On Thu, Sep 10, 2020 at 5:14 PM Seth Wiesman wrote:
> @glen
>
> Yes, we would absolutely migrate statefun. StateFun can be compiled with
> Scala 2.12 today, I'm not sure why it's not cross released.
>
> @aljoscha :)
>
> @mathieu Its
Hi Team,
We have Flink Batch Jobs which needs to be scheduled as listed below:Case 1 :
2.00 UTC time dailyCase 2 : Periodically 2 hours onceCase 3: Schedule
based on an event
Request you to help me on this, How to approach all the 3 use cases. Can we
use Oozie workflows or any
1.flink 版本是1.11.1
streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamBlinkSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
streamTableEnv = StreamTableEnvironment.create(streamEnv, streamBlinkSettings)
没有insert语句也就是没有sink无法触发计算
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你应该用的是最新的版本,flink1.10 之后已经改了操作方式,
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment
flink sql执行sql语句
SELECT kafka_table.src_ip AS kafka_table_src_ip,COUNT(kafka_table.dest_ip) AS
COUNT_kafka_table_dest_ip_ FROM kafka_table GROUP BY kafka_table.src_ip
直接我发运行,我的初始化环境是
初始化 dataStreamEnv
初始化 tableEnv
1.执行sql
2.执行sql的结果转为datastream
dataStreamEnv.execute("SqlPlatformRealTime")
1、在checkpoint后,用ck恢复时报错。
org.apache.kafka.connect.errors.ConnectException:
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000,
eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19,
@Congxian Qiu Sorry,刚看到。
之前使用的 flink 1.7,没有出现过这个问题。升级到 flink 1.10 后这个问题必现,但是时间不定。
On 8/9/2020 15:00,Congxian Qiu wrote:
Hi xuhaiLong
请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗?
Best,
Congxian
xuhaiLong 于2020年8月7日周五 下午3:14写道:
感谢回复!我这边的确是这个bug 引起的
On 8/7/2020 13:43,chenkaibit wrote:
1??timestampvarchar??
2??
2020-09-09 15:25:55.416
local_dtm
| curr_dtm
| local_dtm_no_zone
| curr_dtm_no_zone
|
|
1??timestampvarchar??
2??
2020-09-0915:25:55.416
local_dtm
| curr_dtm
| local_dtm_no_zone
| curr_dtm_no_zone
|
|
各位大佬,在执行flink 流任务的时候,经常会出现,某几台服务器的 CPU比较高(共
用集群,还有很多其他组件),导致在这些机器上的算子的延迟远远高于其他机器上的
算子,
请 flink 是否有动态分区策略或者 Taskmanager 迁移策略,可以完成类似于 spark
在算子执行很慢的情况下,master 会起一个一样的算子,如果后起的算子先完成任
务,任务也可见继续往下游执行。
感谢各位大佬
Hi Josson,
Have you checked the logs as Nico suggested? At 18:55 there is a dip in
non-heap memory, just about when the problems started happening. Maybe you
could post the TM logs?
Have you tried updating JVM to a newer version?
Also it looks like the heap size is the same between 1.4 and 1.8,
请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑?
代码是不是主动设置过stagebackend的地址呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
状态每次有做过清理么。还是在原始基础上进行add的呢,可以贴下代码
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
I want to *get data from S3 and process and send to Kinesis.*
1. Get gzip files from an s3 folder(s3://bucket/prefix)
2. Sort each file
3. Do some map/processing on each record in the file
4. send to Kinesis
Idea is:
env.readTextFile(s3Folder)
.sort(SortFunction)
.map(MapFunction)
33 matches
Mail list logo