Re: Could not forward element to next operator

2019-09-28 Thread 18612537914
我看过这个我不是这个问题,作业没有设置水印,作业可以正常运行,最近可能是运行一天多会报这个异常

发自我的 iPhone

> 在 2019年9月29日,上午11:49,Wesley Peng  写道:
> 
> Hello,
> 
> May this article match your issue?
> https://blog.csdn.net/qq_41910230/article/details/90411237
> 
> regards.
> 
>> On Sun, Sep 29, 2019 at 10:33 AM allan <18612537...@163.com> wrote:
>> 
>> Hi,
>> 
>> 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?flink版本1.6 ,错误如下:
>> 
>> 
>> 
>> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator}
>> 
>>   at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>> 
>>   at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> 
>>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> 
>>   at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> 
>>   at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> 
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> 
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> 
>>   at java.lang.Thread.run(Thread.java:748)
>> 
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> 
>>   at org.apache.flink.streaming.
>> 
>> 
>> runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>> 
>>   at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>> 
>>   at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>> 
>>   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)




flink 命令行疑问

2019-09-28 Thread 戴嘉诚

大家好:

我的flink代码打包的jar包是放到了hdfs上面,但是当我在flink中用命令行执行的时候,flink本地是否只能解析本地jar包?不能解析到hdfs上面的jar包?

我把jar包下载到服务器本地后,就可以执行成功了


我的命令是:
./bin/flink run  -yid application_1567652112073_0001   -p 6 -yj 
hdfs://ysec-storage/flink/runJar/business-security-1.0-SNAPSHOT.jar --appId 
act_test



返回的结果是:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/data/flink/flink-1.9.0/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-09-29 11:48:15,686 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-hdfs.
2019-09-29 11:48:15,686 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-hdfs.
Could not build the program from JAR file.


Re: 使用flink-sql实现mysql维表的join的ddl和dml的示列

2019-09-28 Thread Jark Wu
Hi,

mysql 维表的 DDL 和 源表/结果表 的 DDL 是一样的。 例如下面这个 DDL 的声明,rates 在 flink sql 中既可以作为
source 也可以是 sink,也可以是维表。

CREATE TABLE rates (
currency VARCHAR,
rate BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'rates', -- 表名
'connector.username' = 'root', -- 用户名
'connector.password' = '123456', -- 密码
)

如果要用它作为维表的话,那么需要用到 temporal join 的语法。如:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN rates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

 关于维表 join,可以查看官方文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table




On Fri, 27 Sep 2019 at 14:12, yelun <986463...@qq.com> wrote:

> Hi,各位大佬:
>
> 有没有使用flink-sql实现mysql维表的join的ddl和dml的示列的demo能够参考一下,非常感谢。


Re: Running flink on AWS ECS

2019-09-28 Thread sri hari kali charan Tummala
Aws already has auto scale flink cluster it’s called Kinesis Data Analytics
just add your flink Jar to Kinesis Sql analytics that’s all , aws will auto
provision a flink cluster and do the admin part for you.

On Saturday, September 28, 2019, David Anderson  wrote:

> I believe there can be advantages and disadvantages in both
> directions. For example, fewer containers with multiple slots reduces
> the effort the Flink Master has to do whenever global coordination is
> required, i.e., during checkpointing. And the network stack in the
> task managers is optimized to take advantage of locality, whenever
> possible.
>
> On the other hand, if you have a lot of pressure on the heap (e.g.,
> because you are using a heap-based state backend), then having more,
> smaller task managers can reduce latency by reducing the impact of
> garbage collection pauses.
>
> I'm sure I've overlooked some factors, but the bottom line appears to
> be that there's no one-size-fits-all answer.
>
> David
>
> On Wed, Sep 25, 2019 at 5:43 PM Navneeth Krishnan
>  wrote:
> >
> > Thanks Terry, the reason why I asked this is because somewhere I saw
> running one slot per container is beneficial. I couldn’t find the where I
> saw that.
> > Also I think running it with multiple slots will reduce IPC since some
> of the data will be processed writhing the same JVM.
> >
> > Thanks
> >
> > On Wed, Sep 25, 2019 at 1:16 AM Terry Wang  wrote:
> >>
> >> Hi, Navneeth,
> >>
> >> I think both is ok.
> >> IMO, run one container with number of slots same as virtual cores may
> be better for slots can share the Flink Framework and thus reduce memory
> cost.
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >> > 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
> >> >
> >> > Hi All,
> >> >
> >> > I’m currently running flink on amazon ecs and I have assigned task
> slots based on vcpus per instance. Is it beneficial to run a separate
> container with one slot each or one container with number of slots same as
> virtual cores?
> >> >
> >> > Thanks
> >>
>


-- 
Thanks & Regards
Sri Tummala


Apache Flink write to multiple sinks one after the other in sequence

2019-09-28 Thread shrridevi sreedharan
Hello Team,
Could you please help with the below query.

I have a CsvTableSource to read CSV files and write the data to S3 and
after that I want to load the file from S3 to JDBC sink. How can I make the
JDBC sink execute only after the S3 sink is completed successfully. The
reason is, S3 is going to be used for some other processing and data cannot
be directly moved to JDBC without the S3 hop .

Thanks in advance.

Shrri


Re: Running flink on AWS ECS

2019-09-28 Thread David Anderson
I believe there can be advantages and disadvantages in both
directions. For example, fewer containers with multiple slots reduces
the effort the Flink Master has to do whenever global coordination is
required, i.e., during checkpointing. And the network stack in the
task managers is optimized to take advantage of locality, whenever
possible.

On the other hand, if you have a lot of pressure on the heap (e.g.,
because you are using a heap-based state backend), then having more,
smaller task managers can reduce latency by reducing the impact of
garbage collection pauses.

I'm sure I've overlooked some factors, but the bottom line appears to
be that there's no one-size-fits-all answer.

David

On Wed, Sep 25, 2019 at 5:43 PM Navneeth Krishnan
 wrote:
>
> Thanks Terry, the reason why I asked this is because somewhere I saw running 
> one slot per container is beneficial. I couldn’t find the where I saw that.
> Also I think running it with multiple slots will reduce IPC since some of the 
> data will be processed writhing the same JVM.
>
> Thanks
>
> On Wed, Sep 25, 2019 at 1:16 AM Terry Wang  wrote:
>>
>> Hi, Navneeth,
>>
>> I think both is ok.
>> IMO, run one container with number of slots same as virtual cores may be 
>> better for slots can share the Flink Framework and thus reduce memory cost.
>>
>> Best,
>> Terry Wang
>>
>>
>>
>> > 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
>> >
>> > Hi All,
>> >
>> > I’m currently running flink on amazon ecs and I have assigned task slots 
>> > based on vcpus per instance. Is it beneficial to run a separate container 
>> > with one slot each or one container with number of slots same as virtual 
>> > cores?
>> >
>> > Thanks
>>