Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Hi Rahul,

Try to increase taskmanager.network.memory.max to 1GB, basically double what 
you have now. However, you only have 4GB RAM for the entire TM, seems out of 
proportion to have 1GB network buffer with 4GB total RAM. Reducing number of 
shuffling will require less network buffer. But if your job need the shuffling, 
then you may consider to add more memory to TM.

Thanks,
Ivan

> On Jul 31, 2020, at 2:02 PM, Rahul Patwari  wrote:
> 
> Hi,
> 
> We are observing "Insufficient number of Network Buffers" issue Sporadically 
> when Flink is upgraded from 1.4.2 to 1.8.2.
> The state of the tasks with this issue translated from DEPLOYING to FAILED. 
> Whenever this issue occurs, the job manager restarts. Sometimes, the issue 
> goes away after the restart.
> As we are not getting the issue consistently, we are in a dilemma of whether 
> to change the memory configurations or not.
> 
> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
> The exception says that 13112 no. of network buffers are present, which is 6x 
> the recommendation.
> 
> Is reducing the no. of shuffles the only way to reduce the no. of network 
> buffers required?
> 
> Thanks,
> Rahul 
> 
> configs:
> env: Kubernetes 
> Flink: 1.8.2
> using default configs for memory.fraction, memory.min, memory.max.
> using 8 TM, 8 slots/TM
> Each TM is running with 1 core, 4 GB Memory.
> 
> Exception:
> java.io.IOException: Insufficient number of network buffers: required 2, but 
> only 0 available. The total number of network buffers is currently set to 
> 13112 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.network.memory.fraction', 
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
> at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
> at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
> at java.lang.Thread.run(Thread.java:748)



Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
ok,谢谢,懂了哈哈














在 2020-07-31 21:27:02,"Leonard Xu"  写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying  写道:
>> 
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
>是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
>如果已经在用1.11了,1.10的文档可以不用看的。
> 
>在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
>KEY,不会再有类似问题.1.11的文档参考[2]。
>
>祝好
>Leonard
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
> 
>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> 
>


allowNonRestoredState: metadata file in checkpoint dir missing

2020-07-31 Thread Deshpande, Omkar
Hello,

When deleting an operator we run our application with 
--allowNonRestoredState=true, as described in the 
documentation.
 When running with this flag, we have observed that the _metadata file is not 
generated in the checkpoint directory. So, if the application fails, we don’t 
have the ability to start from the checkpoint. And since the application has 
failed, we can’t take a savepoint as well.


Is _metadata file not being created in this case expected behavior?

How do we achieve resilience while using --allowNonRestoredState?


We are using Beam with the Flink runner(java).

  *   Beam 2.19

  *   Flink 1.9

Omkar


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng,

Thanks a lot for raising the discussion. +1 for the FLIP.

I think this will bring big benefits for the PyFlink users. Currently, the
Python TableAPI document is hidden deeply under the TableAPI tab which
makes it quite unreadable. Also, the PyFlink documentation is mixed with
Java/Scala documentation. It is hard for users to have an overview of all
the PyFlink documents. As more and more functionalities are added into
PyFlink, I think it's time for us to refactor the document.

Best,
Hequn


On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
wrote:

> Hi, Jincheng!
>
> Thanks for creating this detailed FLIP, it will make a big difference in
> the experience of Python developers using Flink. I'm interested in
> contributing to this work, so I'll reach out to you offline!
>
> Also, thanks for sharing some information on the adoption of PyFlink, it's
> great to see that there are already production users.
>
> Marta
>
> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>
> > Hi Jincheng,
> >
> > Thanks a lot for bringing up this discussion and the proposal.
> >
> > Big +1 for improving the structure of PyFlink doc.
> >
> > It will be very friendly to give PyFlink users a unified entrance to
> learn
> > PyFlink documents.
> >
> > Best,
> > Xingbo
> >
> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >> improve the Python API doc.
> >>
> >> I have received many feedbacks from PyFlink beginners about
> >> the PyFlink doc, e.g. the materials are too few, the Python doc is mixed
> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>
> >> I think it would greatly improve the user experience if we can have one
> >> place which includes most knowledges PyFlink users should know.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>
> >> Hi folks,
> >>
> >> Since the release of Flink 1.11, users of PyFlink have continued to
> grow.
> >> As far as I know there are many companies have used PyFlink for data
> >> analysis, operation and maintenance monitoring business has been put
> into
> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
> >> the feedback we received, current documentation is not very friendly to
> >> PyFlink users. There are two shortcomings:
> >>
> >> - Python related content is mixed in the Java/Scala documentation, which
> >> makes it difficult for users who only focus on PyFlink to read.
> >> - There is already a "Python Table API" section in the Table API
> document
> >> to store PyFlink documents, but the number of articles is small and the
> >> content is fragmented. It is difficult for beginners to learn from it.
> >>
> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >> documents will be added for those new APIs. In order to increase the
> >> readability and maintainability of the PyFlink document, Wei Zhong and
> me
> >> have discussed offline and would like to rework it via this FLIP.
> >>
> >> We will rework the document around the following three objectives:
> >>
> >> - Add a separate section for Python API under the "Application
> >> Development" section.
> >> - Restructure current Python documentation to a brand new structure to
> >> ensure complete content and friendly to beginners.
> >> - Improve the documents shared by Python/Java/Scala to make it more
> >> friendly to Python users and without affecting Java/Scala users.
> >>
> >> More detail can be found in the FLIP-133:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
> >>
> >>
> >>
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Hequn Cheng
Hi Jincheng,

Thanks a lot for raising the discussion. +1 for the FLIP.

I think this will bring big benefits for the PyFlink users. Currently, the
Python TableAPI document is hidden deeply under the TableAPI tab which
makes it quite unreadable. Also, the PyFlink documentation is mixed with
Java/Scala documentation. It is hard for users to have an overview of all
the PyFlink documents. As more and more functionalities are added into
PyFlink, I think it's time for us to refactor the document.

Best,
Hequn


On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
wrote:

> Hi, Jincheng!
>
> Thanks for creating this detailed FLIP, it will make a big difference in
> the experience of Python developers using Flink. I'm interested in
> contributing to this work, so I'll reach out to you offline!
>
> Also, thanks for sharing some information on the adoption of PyFlink, it's
> great to see that there are already production users.
>
> Marta
>
> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>
> > Hi Jincheng,
> >
> > Thanks a lot for bringing up this discussion and the proposal.
> >
> > Big +1 for improving the structure of PyFlink doc.
> >
> > It will be very friendly to give PyFlink users a unified entrance to
> learn
> > PyFlink documents.
> >
> > Best,
> > Xingbo
> >
> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >> improve the Python API doc.
> >>
> >> I have received many feedbacks from PyFlink beginners about
> >> the PyFlink doc, e.g. the materials are too few, the Python doc is mixed
> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>
> >> I think it would greatly improve the user experience if we can have one
> >> place which includes most knowledges PyFlink users should know.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>
> >> Hi folks,
> >>
> >> Since the release of Flink 1.11, users of PyFlink have continued to
> grow.
> >> As far as I know there are many companies have used PyFlink for data
> >> analysis, operation and maintenance monitoring business has been put
> into
> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
> >> the feedback we received, current documentation is not very friendly to
> >> PyFlink users. There are two shortcomings:
> >>
> >> - Python related content is mixed in the Java/Scala documentation, which
> >> makes it difficult for users who only focus on PyFlink to read.
> >> - There is already a "Python Table API" section in the Table API
> document
> >> to store PyFlink documents, but the number of articles is small and the
> >> content is fragmented. It is difficult for beginners to learn from it.
> >>
> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >> documents will be added for those new APIs. In order to increase the
> >> readability and maintainability of the PyFlink document, Wei Zhong and
> me
> >> have discussed offline and would like to rework it via this FLIP.
> >>
> >> We will rework the document around the following three objectives:
> >>
> >> - Add a separate section for Python API under the "Application
> >> Development" section.
> >> - Restructure current Python documentation to a brand new structure to
> >> ensure complete content and friendly to beginners.
> >> - Improve the documents shared by Python/Java/Scala to make it more
> >> friendly to Python users and without affecting Java/Scala users.
> >>
> >> More detail can be found in the FLIP-133:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
> >>
> >>
> >>
>


sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Rahul Patwari
Hi,

We are observing "Insufficient number of Network Buffers" issue
Sporadically when Flink is upgraded from 1.4.2 to 1.8.2.
The state of the tasks with this issue translated from DEPLOYING to FAILED.
Whenever this issue occurs, the job manager restarts. Sometimes, the issue
goes away after the restart.
As we are not getting the issue consistently, we are in a dilemma of
whether to change the memory configurations or not.

Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
The exception says that 13112 no. of network buffers are present, which is
6x the recommendation.

Is reducing the no. of shuffles the only way to reduce the no. of network
buffers required?

Thanks,
Rahul

configs:
env: Kubernetes
Flink: 1.8.2
using default configs for memory.fraction, memory.min, memory.max.
using 8 TM, 8 slots/TM
Each TM is running with 1 core, 4 GB Memory.

Exception:
java.io.IOException: Insufficient number of network buffers: required 2,
but only 0 available. The total number of network buffers is currently set
to 13112 of 32768 bytes each. You can increase this number by setting the
configuration keys 'taskmanager.network.memory.fraction',
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:748)


Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Thank You Niels. Would you have something for the scala object class.  Say
for example if I want to implement a unit test ( not integration test) for
below code or similar  :

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala

Regards,
Vijay

On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes  wrote:

> Does this test in one of my own projects do what you are looking for?
>
>
> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>
>
> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, 
> wrote:
>
>> Hi Team,
>>
>> Looking for some help and reference code / material to implement unit
>> tests of possible scenarios in Flink *streaming *Code that should assert
>> specific cases.
>>
>> Regards,
>> Vijay
>>
>


Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Niels Basjes
Does this test in one of my own projects do what you are looking for?

https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107


On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav,  wrote:

> Hi Team,
>
> Looking for some help and reference code / material to implement unit
> tests of possible scenarios in Flink *streaming *Code that should assert
> specific cases.
>
> Regards,
> Vijay
>


Kafka source, committing and retries

2020-07-31 Thread Jack Phelan
Scenario
===

A partition that Flink is reading:
[ 1 - 2 - 3 - 4 - 5 - 6 - 7 - |  8 _ 9 _ 10 _ 11 | 12 ~ 13 ]
[.   Committed.   | In flight  | unread  ]

Kafka basically breaks off pieces of the end of the queue and shoves them
downstream for processing?

So suppose while semantically:
- 8 &10 succeed (api call success)
- 9 & 11 fail (api failure).

Failure Handling options
==

Basically we have two options to handle failures?

A. Try/catch to deadletter queue
```
try {
api.write(8, 9, 10, 11);
} catch E {
// 9, 11 failed to write to the api so we deadletter them

deadletterQueue.write(E.failed_set())
}
```

B. Or it can fail - which will retry the batch?
```
api.write(8, 9, 10, 11);
// 9, 11 failed to write to the api
```

In situation (B.), we're rewriting 8 and 10 to the api, which is bad, so
situation (A.) seems better.


Challenge I can't understand
==

However in (A.) we then do something with the queue:

A2. Try/catch to another deadletter queue?
```
try {
api.write(9, 11);
} catch E {
//11 failed to write to the api
deadletterQueue2.write(E.failed_set())
}
```

Do you see what I mean? Is it turtles all the way down?

Should I create a separate index of semantic outcome? Where should it live?

Should I just keep things in the queue until


smime.p7s
Description: S/MIME Cryptographic Signature


[Flink Unit Tests] Unit test for Flink streaming codes

2020-07-31 Thread Vijayendra Yadav
Hi Team,

Looking for some help and reference code / material to implement unit tests
of possible scenarios in Flink *streaming *Code that should assert specific
cases.

Regards,
Vijay


Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 Thread Eleanore Jin
Hi Experts,

I have a flink cluster (per job mode) running on kubernetes. The job is
configured with restart strategy

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s


So after 3 times retry, the job will be marked as FAILED, hence the pods
are not running. However, kubernetes will then restart the job again as the
available replicas do not match the desired one.

I wonder what are the suggestions for such a scenario? How should I
configure the flink job running on k8s?

Thanks a lot!
Eleanore


Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 Thread Eleanore Jin
Hi Experts,

I have a flink cluster (per job mode) running on kubernetes. The job is
configured with restart strategy

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s


So after 3 times retry, the job will be marked as FAILED, hence the pods
are not running. However, kubernetes will then restart the job again as the
available replicas do not match the desired one.

I wonder what are the suggestions for such a scenario? How should I
configure the flink job running on k8s?

Thanks a lot!
Eleanore


Is there a way to get file "metadata" as part of stream?

2020-07-31 Thread John Smith
Hi, so reading a CSV file using env.readFile() with RowCsvInputFormat.

Is there a way to get the filename as part of the row stream?

The file contains a unique identifier to tag the rows with.


Re: How to stream CSV from S3?

2020-07-31 Thread John Smith
Hi Yes it works :)

For the Java guys...

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

String path = "file:///foo/bar";

TypeInformation[] fieldTypes = new TypeInformation[]{
  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO
};

RowCsvInputFormat csvFormat =
  new RowCsvInputFormat(
new Path(path), fieldTypes);
csvFormat.setSkipFirstLineAsHeader(true);

DataStreamSource lines = env.readFile(csvFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000);

lines.map(value -> value).print();


On Thu, 30 Jul 2020 at 05:42, Arvid Heise  wrote:

> Hi John,
>
> I found an example on SO [1] in Scala.
>
> [1] https://stackoverflow.com/a/52093079/10299342
>
> On Tue, Jul 28, 2020 at 4:29 PM John Smith  wrote:
>
>> Hi, is there an example on how RowCsvInputFormat is initialized?
>>
>> On Tue, 28 Jul 2020 at 04:00, Jingsong Li  wrote:
>>
>>> - `env.readCsvFile` is in DataSet, just read the full amount of data
>>> once in batch mode.
>>> - `streamEnv.readFile(RowCsvInputFormat, filePath,
>>> FileProcessingMode.PROCESS_CONTINUOUSLY, monitorInterval)` can monitor
>>> directory, and continue reading in streaming mode.
>>>
>>> On Tue, Jul 28, 2020 at 3:54 PM John Smith 
>>> wrote:
>>>
 Also this where I find the docs confusing in the "connectors" section.
 File system isn't under Data streaming but env.readCsvFile seems like it
 can do the trick?

 On Tue., Jul. 28, 2020, 3:46 a.m. John Smith, 
 wrote:

> Bassically I want to "monitor" a bucket on S3 and every file that gets
> created in that bucket read it and stream it.
>
> If I understand correctly, I can just use env.readCsvFile() and config
> to continuously read a folder path?
>
>
> On Tue., Jul. 28, 2020, 1:38 a.m. Jingsong Li, 
> wrote:
>
>> Hi John,
>>
>> Do you mean you want to read S3 CSV files using
>> partition/bucket pruning?
>>
>> If just using the DataSet API, you can use CsvInputFormat to read csv
>> files.
>>
>> If you want to use Table/Sql API, In 1.10, Csv format in table not
>> support partitioned table. So the only way is specific the 
>> partition/bucket
>> path, and read single directory.
>>
>> In 1.11, the Table/Sql filesystem connector with csv format supports
>> partitioned table, complete support partition semantics.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jul 27, 2020 at 10:54 PM John Smith 
>> wrote:
>>
>>> Hi, using Flink 1.10
>>>
>>> 1- How do we go about reading CSV files that are copied to s3
>>> buckets?
>>> 2- Is there a source that can tail S3 and start reading a CSV when
>>> it is copied to S3?
>>> 3- Is that part of the table APIs?
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


JDBCOutputFormat dependency loading error

2020-07-31 Thread Flavio Pompermaier
Hi to all,
I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting toward
Mariadb in my code.
I've put the mariadb-java-client-2.6.0.jar in the lib directory and in the
pom.xml I set that dependency as provided. The code runs successfully from
the Ide but when I try to run the code on the cluster I get the following
error:

Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Class.java:315) ~[?:?]
at
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
~myApp.jar:?]
at
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
~myApp.jar:?]
at
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
~myApp.jar:?]
at
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:82)
~myApp.jar:?]
at
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at java.lang.Thread.run(Thread.java:834) ~[?:?]

What should I do?

Thanks in advance,
Flavio


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread Leonard Xu
Hello

> 在 2020年7月31日,21:13,chenxuying  写道:
> 
> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
如果已经在用1.11了,1.10的文档可以不用看的。
 
在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
KEY,不会再有类似问题.1.11的文档参考[2]。

祝好
Leonard

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
 


Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

















在 2020-07-31 16:46:41,"Leonard Xu"  写道:
>Hi, chenxuying
>
>看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
>,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
> 
>Best
>Leonard
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
> 
>
>
>> 在 2020年7月31日,16:12,chenxuying  写道:
>> 
>> hi
>> 我使用的flink 1.11.0版本
>> 代码如下
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
>> tableEnvironment.executeSql(" " +
>> " CREATE TABLE mySource ( " +
>> "  a bigint, " +
>> "  b bigint " +
>> " ) WITH ( " +
>> "  'connector.type' = 'kafka', " +
>> "  'connector.version' = 'universal', " +
>> "  'connector.topic' = 'mytesttopic', " +
>> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
>> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
>> "  'connector.properties.group.id' = 'flink-test-cxy', " +
>> "  'connector.startup-mode' = 'latest-offset', " +
>> "  'format.type' = 'json' " +
>> " ) ");
>> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "  'connector.type' = 'jdbc',   " +
>> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' 
>> , " +
>> "  'connector.username' = 'root' , " +
>> "  'connector.password' = 'root',  " +
>> "  'connector.table' = 'mysqlsink' , " +
>> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> "  'connector.write.flush.interval' = '2s',  " +
>> "  'connector.write.flush.max-rows' = '300'  " +
>> " )");
>> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
>> (select a,cast(b as varchar) b from mySource)");
>> 
>> 
>> 问题一 : 上面的insert语句会出现如下错误
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
>> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(> A, VARCHAR(2147483647) B)>)'. Supported form(s): 
>> '$SCALAR_QUERY()'
>> 
>> 
>> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
>> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
>> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate 
>> entry '1' for key 'PRIMARY'
>> 
>> 
>> 
>


Re: Flink sql 转义字符问题

2020-07-31 Thread Leonard Xu
Hi, zilong

SPLIT_INDEX(${xxx}, ‘;’, 0)

 ‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。
  U&'\003B'  是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用,
  比如 \n 对应的s是 U&'\\000A’ ,\r 对应的是 U&'\\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。

祝好
Leonard 

> 在 2020年7月31日,20:46,zilong xiao  写道:
> 
> U&'\003B'  这么写就可以了 感觉好奇怪啊。。
> 
> 李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:
> 
>> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>> 
>>> 在 2020年7月31日,下午8:13,zilong xiao  写道:
>>> 
>>> SPLIT_INDEX(${xxx}, ';',
>>> 
>> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>> 



回复: Flink sql 转义字符问题

2020-07-31 Thread Hannan Kan
我看官方文档https://help.aliyun.com/knowledge_detail/62544.html中接口是VARCHAR 
SPLIT_INDEX(VARCHAR str, VARCHAR sep, INT index)
sep 是字符串类型。是不是要用双引号或者看下分号是不是英文的?


--原始邮件--
发件人:
"user-zh"   
 


Re: Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
U&'\003B'  这么写就可以了 感觉好奇怪啊。。

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


Re: Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread David Anderson
It sounds like you would like to have something like event-time-based
windowing, but with independent watermarking for every key. An approach
that can work, but it is somewhat cumbersome, is to not use watermarks or
windows, but instead put all of the logic in a KeyedProcessFunction (or
RichFlatMap). In this way you are free to implement your own policy for
deciding when a given "window" for a specific key (device) is ready for
processing, based solely on observing the events for that specific key.

Semantically I think this is similar to running a separate instance of the
job for each source, but with multi-tenancy, and with an impoverished API
(no watermarks, no event time timers, no event time windows).

Note that it is already the case that each parallel instance of an operator
has its own, independent notion of the current watermark. I believe your
problems arise from the fact that this current watermark is applied to all
events processed by that instance, regardless of their keys. I believe you
would like each key to maintain its own current watermark (event time
clock), so if one key (device) is idle, its watermark will wait for further
events to arrive. As it is now, events for other keys processed by the same
operator instance (or subtask) will advance the shared watermark, causing
an idle device's events to become late.

Regards,
David

On Fri, Jul 31, 2020 at 1:42 PM Sush Bankapura <
sushrutha.bankap...@man-es.com> wrote:

> Hi,
>
> We have a single Flink job that works on data from multiple data sources.
> These data sources are not aligned in time and also have intermittent
> connectivity lasting for days, due to which data will arrive late
>
> We attempted to use the event time and watermarks with parallel streams
> using keyby for the data source
>
> In case of parallel streams, for certain operators, the event time clock
> across all the subtasks  of the operator is the minimum value of the
> watermark among all its input streams.
>
> Reference:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks
> in-parallel-streams
>
> While this seems to be a fundamental concept of Flink, are there any plans
> of having event  time clock per operator per subtask for such operators?
>
> This is causing us, not to use watermarks and to fallback on processing
> time semantics or in the worst case running the same Flink job for each and
> every different data source from which we are collecting data through Kafka
>
> Thanks,
> Sush
>


Re: Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
实测反斜杠好像也不行

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


Re: Flink sql 转义字符问题

2020-07-31 Thread 李奇
加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。

> 在 2020年7月31日,下午8:13,zilong xiao  写道:
> 
> SPLIT_INDEX(${xxx}, ';',
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Flink sql 转义字符问题

2020-07-31 Thread zilong xiao
SPLIT_INDEX(${xxx}, ';',
0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Re: Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread Jark Wu
Hi,

For now, you can explicitly set the RowTypeInfo to retain the field names.
This works in master branch:

*val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)*
// t1 stream schema: Row(a: Integer, b: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- b: INT
 */
println(s"new t1 table schema: ${tEnv.scan("t1").getSchema}")


Best,
Jark

On Fri, 31 Jul 2020 at 18:03, izual  wrote:

> I create a JIRA issue here,
> https://issues.apache.org/jira/browse/FLINK-18782
> And thanks for your advice to avoid 「top-level projection/rename」^_^
>
>
>
>
> At 2020-07-30 16:58:45, "Dawid Wysakowicz"  wrote:
>
> Hi,
>
> I am afraid you are facing an issue that was not checked for/was not
> considered. I think your use case is absolutely valid and should be
> supported.
>
> The problem you are facing as far as I can tell from an initial
> investigation is that the top-level projection/rename is not being applied.
> Internally the foo(a) is passed around as an unnamed expression and should
> be aliased at the top level. This happens when simply querying therefore
> you get expected results in the first case when only printing the schema of
> a Table.
>
> When translating to the datastream this final rename does not take place,
> which imo is a bug. You can see this behaviour if you add additional
> projection. Then the renaming of the expression from lateral table happens
> a level deeper and is not stripped.
>
> val t1 = tableEnv.sqlQuery(
>   """
> |SELECT 1, * FROM (
> |SELECT source_table.a, b FROM source_table
> |, LATERAL TABLE(foo(a)) as T(b))
> |""".stripMargin
>
>
> t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
> new t1 table schema: root
>  |-- EXPR$0: INT
>  |-- a: INT
>  |-- b: INT
>
>
> Do you mind creating a JIRA issue to fix/support this case?
>
> Unfortunately I can not think of a really good way how you could retain
> the column names. :(
>
> Best,
>
> Dawid
> On 28/07/2020 10:26, izual wrote:
>
> Hi, Community:
>   I met some field name errors when try to convert in Table and DataStream.
>   flink version: 1.9.1
>
> First, init a datastream and convert to table 'source', register a
> tablefunction named 'foo'
>
> val sourceStream = env.socketTextStream("127.0.0.1", 8010)
>   .map(line => line.toInt)tableEnv.registerDataStream("source_table", 
> sourceStream, 'a)
> class Foo() extends TableFunction[(Int)] {
>   def eval(col: Int): Unit = collect((col * 10))
> }
> tableEnv.registerFunction("foo", new Foo)
>
> Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
>
> val t1 = tableEnv.sqlQuery(
>   """|SELECT source_table.a, b FROM source_table|, LATERAL 
> TABLE(foo(a)) as T(b)|""".stripMargin
> )/* t1 table schema: root |-- a: INT |-- b: INT */println(s"t1 table schema: 
> ${t1.getSchema}")
>
> When I try to convert 't1' to a datastream then register to a new
> table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a'
> 'b'
> I can find 'f0' only with the Java-API in Refs-1.
>
> val t1Stream = t1.toAppendStream[Row]// t1 stream schema: Row(a: Integer, f0: 
> Integer)println(s"t1 stream schema: 
> ${t1Stream.getType()}")tableEnv.registerDataStream("t1", t1Stream)/*new t1 
> table schema: root|-- a: INT|-- f0: INT */println(s"new t1 table schema: 
> ${tableEnv.scan("t1").getSchema}")
>
> Consider maybe the default TypeExtractor(?) works not very well here, Then
> I try to set the field name explicitly, but failed too.
>
> tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
>
> If I add a proctime at first, this works well, but I do not want to set a
> proctime which is unused.
>
> tableEnv.registerDataStream("source_table", sourceStream, 'a, 
> 'proctime.proctime)
>
>
> And my question is :
> 1. why the behavior of the code above seems a little strange?
> 2. How to retain the 'b' when convert with table and stream frequently?
>
> Refs:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table
>
> Thanks for ur reply.
>
>
>
>
>
>
>
>


Re: Flink 1.11 submit job timed out

2020-07-31 Thread Matt Wang
遇到了同样的问题,也是启动了 taskmanager-query-state-service.yaml 这个服务后,作业才能正常提交的,另外我是在本地装的 
k8s 集群进行测试的,如果是 GC 的问题,启不启动 TM service 应该不会有影响的


--

Best,
Matt Wang


On 07/27/2020 15:01,Yang Wang wrote:
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
怀疑还是和GC有关的

env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

SmileSmile  于2020年7月27日周一 下午1:50写道:

Hi,Yang Wang

因为日志太长了,删了一些重复的内容。
一开始怀疑过jm gc的问题,将jm的内存调整为10g也是一样的情况。

Best



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/27/2020 11:36, Yang Wang wrote:
看你这个任务,失败的根本原因并不是“No hostname could be resolved
”,这个WARNING的原因可以单独讨论(如果在1.10里面不存在的话)。
你可以本地起一个Standalone的集群,也会有这样的WARNING,并不影响正常使用


失败的原因是slot 5分钟申请超时了,你给的日志里面2020-07-23 13:55:45,519到2020-07-23
13:58:18,037是空白的,没有进行省略吧?
这段时间按理应该是task开始deploy了。在日志里看到了JM->RM的心跳超时,同一个Pod里面的同一个进程通信也超时了
所以怀疑JM一直在FullGC,这个需要你确认一下


Best,
Yang

SmileSmile  于2020年7月23日周四 下午2:43写道:

Hi Yang Wang

先分享下我这边的环境版本


kubernetes:1.17.4.   CNI: weave


1 2 3 是我的一些疑惑

4 是JM日志


1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup

kubectl exec -it busybox2 -- /bin/sh
/ # nslookup 10.47.96.2
Server:  10.96.0.10
Address: 10.96.0.10:53

** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN



2. Flink1.11和Flink1.10

detail subtasks taskmanagers xxx x 这行

1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)

3. coredns是否特殊配置?

在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?


4. time out时候的JM日志如下:



2020-07-23 13:53:00,228 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
ResourceManager akka.tcp://flink@flink-jobmanager
:6123/user/rpc/resourcemanager_0
was granted leadership with fencing token

2020-07-23 13:53:00,232 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
at akka://flink/user/rpc/dispatcher_1 .
2020-07-23 13:53:00,233 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl []
-
Starting the SlotManager.
2020-07-23 13:53:03,472 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696
(akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,777 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4
(akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,787 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4
(akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,044 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2
(akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,099 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330
(akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,146 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8
(akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at
ResourceManager


2020-07-23 13:55:44,220 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Received
JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,222 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Submitting job 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,251 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2020-07-23 13:55:44,260 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,278 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
NoRestartBackoffTimeStrategy for JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2020-07-23 13:55:44,428 INFO

Support for Event time clock specific to each stream in parallel streams

2020-07-31 Thread Sush Bankapura
Hi,

We have a single Flink job that works on data from multiple data sources. These 
data sources are not aligned in time and also have intermittent connectivity 
lasting for days, due to which data will arrive late

We attempted to use the event time and watermarks with parallel streams using 
keyby for the data source

In case of parallel streams, for certain operators, the event time clock across 
all the subtasks  of the operator is the minimum value of the watermark among 
all its input streams. 

Reference: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks
 in-parallel-streams

While this seems to be a fundamental concept of Flink, are there any plans of 
having event  time clock per operator per subtask for such operators?

This is causing us, not to use watermarks and to fallback on processing time 
semantics or in the worst case running the same Flink job for each and every 
different data source from which we are collecting data through Kafka

Thanks,
Sush


Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Vikash Dat
Thanks for the reply. I am currently using 1.10 but also saw it happens in
1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to
1.10 at the moment. Are there any known work arounds?

On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren  wrote:

> Hi Vikash,
>
> It's a bug about classloader used in `abortTransaction()` method in
> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
> 1.10.0?
>
>
> Vikash Dat  于2020年7月30日周四 下午9:26写道:
>
>> Has anyone had success with using exactly_once in a kafka producer in
>> flink?
>> As of right now I don't think the code shown in the docs
>> (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
>> )
>> actually works.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Best Regards,
>
> *Qingsheng Ren*
>
> Electrical and Computer Engineering
> Carnegie Mellon University
>
> Email: renqs...@gmail.com
>


Re: Customization of execution environment

2020-07-31 Thread Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth 
and that the programmatic setter methods and everything else just modify 
the underlying configuration.


We have made big steps in at least allowing to configure most (if not 
all) StreamExecutionEnvironment and TableEnvironment settings via a 
Configuration but we're not completely there yet.


To me it's not yet clear whether modifications on the Configuration of 
the TableEnvironment should go back to the Configuration of the 
StreamExecutionEnvironment. It might be that some users find it 
surprising that changes propagate.


Best,
Aljoscha



On 30.07.20 15:41, Flavio Pompermaier wrote:

That's fine and it's basically what I do as well..I was arguing that it's
bad (IMHO) that you could access the config from the BatchTableEnvironment
(via bte.getConfig().getConfiguration()).
You legitimately think that you are customizing the env but that's
illusory. You should not be able to set properties if they are read only.

Cheers,
Flavio


On Thu, Jul 30, 2020 at 12:15 PM Arvid Heise  wrote:


I'm not entirely sure, if I completely understand the interaction of BTE
and ExecEnv, but I'd create it this way

Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);

ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
BatchTableEnvironment bte = BatchTableEnvironment.create(env);


On Wed, Jul 29, 2020 at 8:14 AM Robert Metzger 
wrote:


Hi Flavio,

I think the recommended approach is as follows: (then you don't need to
create to environments)

final Configuration conf = new Configuration();
conf.setLong(...)
env = new LocalEnvironment(conf);

I agree that in theory it would be nicer if the configuration returned
was editable, but the handling of configs in Flink is pretty involved
already.


On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier 
wrote:


Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and
I need to create env as a new LocalEnvironment if I want to customize it
during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's
actually editable?

Best,
Flavio





--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng







Re:Re: How to retain the column'name when convert a Table to DataStream

2020-07-31 Thread izual
I create a JIRA issue here, https://issues.apache.org/jira/browse/FLINK-18782
And thanks for your advice to avoid 「top-level projection/rename」^_^










At 2020-07-30 16:58:45, "Dawid Wysakowicz"  wrote:

Hi,

I am afraid you are facing an issue that was not checked for/was not 
considered. I think your use case is absolutely valid and should be supported.

The problem you are facing as far as I can tell from an initial investigation 
is that the top-level projection/rename is not being applied. Internally the 
foo(a) is passed around as an unnamed expression and should be aliased at the 
top level. This happens when simply querying therefore you get expected results 
in the first case when only printing the schema of a Table.

When translating to the datastream this final rename does not take place, which 
imo is a bug. You can see this behaviour if you add additional projection. Then 
the renaming of the expression from lateral table happens a level deeper and is 
not stripped.


val t1 = tableEnv.sqlQuery(
  """
|SELECT 1, * FROM (
|SELECT source_table.a, b FROM source_table
|, LATERAL TABLE(foo(a)) as T(b))
|""".stripMargin




t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
new t1 table schema: root
 |-- EXPR$0: INT
 |-- a: INT
 |-- b: INT




Do you mind creating a JIRA issue to fix/support this case?


Unfortunately I can not think of a really good way how you could retain the 
column names. :(

Best,

Dawid


On 28/07/2020 10:26, izual wrote:

Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1


First, init a datastream and convert to table 'source', register a 
tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))

}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
  """
|SELECT source_table.a, b FROM source_table
|, LATERAL TABLE(foo(a)) as T(b)
|""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for 
some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I 
try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a 
proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 
'proctime.proctime)


And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?


Refs:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table


Thanks for ur reply.




 

Re: RocksDBKeyedStateBackend如何写磁盘

2020-07-31 Thread jun su
hi,

看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?

 private void flushIfNeeded() throws RocksDBException {
boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
getDataSize() >= batchSize);
if (needFlush) {
flush();
}
}

batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置

jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:

>
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> sujun891...@gmail.com;
> 发送时间:2020年7月31日(星期五) 下午4:37
> 收件人:"user-zh"
> 主题:RocksDBKeyedStateBackend如何写磁盘
>
>
>
> hi all,
>
> 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
>
> --
> Best,
> Jun Su



-- 
Best,
Jun Su


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread Leonard Xu
Hi, chenxuying

看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
,这是老的option,使用老的option参数还是需要根据query推导主键,
需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
 
Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
 


> 在 2020年7月31日,16:12,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



??????RocksDBKeyedStateBackend??????????

2020-07-31 Thread jiafu
writerbuffer??flushcheckpoint??snapshot??rocksdb??checkpointsst??




----
??: 
   "user-zh"



RocksDBKeyedStateBackend如何写磁盘

2020-07-31 Thread jun su
hi all,

请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢

-- 
Best,
Jun Su


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread 李奇
改成update模式,然后也可以修改唯一主键为自然键

> 在 2020年7月31日,下午4:13,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
"  a bigint, " +
"  b bigint " +
" ) WITH ( " +
"  'connector.type' = 'kafka', " +
"  'connector.version' = 'universal', " +
"  'connector.topic' = 'mytesttopic', " +
"  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
"  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
"  'connector.properties.group.id' = 'flink-test-cxy', " +
"  'connector.startup-mode' = 'latest-offset', " +
"  'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"  'connector.type' = 'jdbc',   " +
"  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " 
+
"  'connector.username' = 'root' , " +
"  'connector.password' = 'root',  " +
"  'connector.table' = 'mysqlsink' , " +
"  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
"  'connector.write.flush.interval' = '2s',  " +
"  'connector.write.flush.max-rows' = '300'  " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
(select a,cast(b as varchar) b from mySource)");


问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 
'$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY()'. Supported form(s): '$SCALAR_QUERY()'


问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
'1' for key 'PRIMARY'





Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 Thread Marta Paes Moreira
Hi, Jincheng!

Thanks for creating this detailed FLIP, it will make a big difference in
the experience of Python developers using Flink. I'm interested in
contributing to this work, so I'll reach out to you offline!

Also, thanks for sharing some information on the adoption of PyFlink, it's
great to see that there are already production users.

Marta

On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:

> Hi Jincheng,
>
> Thanks a lot for bringing up this discussion and the proposal.
>
> Big +1 for improving the structure of PyFlink doc.
>
> It will be very friendly to give PyFlink users a unified entrance to learn
> PyFlink documents.
>
> Best,
> Xingbo
>
> Dian Fu  于2020年7月31日周五 上午11:00写道:
>
>> Hi Jincheng,
>>
>> Thanks a lot for bringing up this discussion and the proposal. +1 to
>> improve the Python API doc.
>>
>> I have received many feedbacks from PyFlink beginners about
>> the PyFlink doc, e.g. the materials are too few, the Python doc is mixed
>> with the Java doc and it's not easy to find the docs he wants to know.
>>
>> I think it would greatly improve the user experience if we can have one
>> place which includes most knowledges PyFlink users should know.
>>
>> Regards,
>> Dian
>>
>> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>>
>> Hi folks,
>>
>> Since the release of Flink 1.11, users of PyFlink have continued to grow.
>> As far as I know there are many companies have used PyFlink for data
>> analysis, operation and maintenance monitoring business has been put into
>> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
>> the feedback we received, current documentation is not very friendly to
>> PyFlink users. There are two shortcomings:
>>
>> - Python related content is mixed in the Java/Scala documentation, which
>> makes it difficult for users who only focus on PyFlink to read.
>> - There is already a "Python Table API" section in the Table API document
>> to store PyFlink documents, but the number of articles is small and the
>> content is fragmented. It is difficult for beginners to learn from it.
>>
>> In addition, FLIP-130 introduced the Python DataStream API. Many
>> documents will be added for those new APIs. In order to increase the
>> readability and maintainability of the PyFlink document, Wei Zhong and me
>> have discussed offline and would like to rework it via this FLIP.
>>
>> We will rework the document around the following three objectives:
>>
>> - Add a separate section for Python API under the "Application
>> Development" section.
>> - Restructure current Python documentation to a brand new structure to
>> ensure complete content and friendly to beginners.
>> - Improve the documents shared by Python/Java/Scala to make it more
>> friendly to Python users and without affecting Java/Scala users.
>>
>> More detail can be found in the FLIP-133:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>>
>> Best,
>> Jincheng
>>
>> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>>
>>
>>


Re: Colocating Compute

2020-07-31 Thread Dawid Wysakowicz
Hi Satyam,

It should be fine to have unbounded InputFormat. The important thing is
not to produce more splits than there are parallel instances of your
source. In createInputSplits(int minNumSplits) generate only
minNumSplits. It is so that all splits can be assigned immediately.
Unfortunately you won't have access to the state in InputFormat. Now
that I am thinking this will be problematic with checkpoints as you
cannot store the offset, up to when you've read the split.

In the SourceFunction stack as far as I know there is no built-in
support for that. As an alternative you could maybe built-in the split
assignment into the SourceFunction. Unfortunately as it would not happen
in a single location you would have to ensure that the logic can assign
all the splits independently in each of the parallel instances of the
source.

The Split, SourceReader, and SplitEnumerator are new components
introduced in FLIP-27[1]. I am not very familiar with those yet.
Unfortunately those are not yet supported in the Table ecosystem. I also
don't know if it is possible to assign the splits based on the host
machine with them. I am cc'ing Stephan and Becket who worked on those to
check if it is already possible with the interfaces.

Best,

Dawid


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On 31/07/2020 02:58, Satyam Shekhar wrote:
> Hi Dawid,
>
> I am currently on Flink v1.10. Do streaming pipelines support
> unbounded InputFormat in v1.10? My current setup uses SourceFunction
> for streaming pipeline and InputFormat for batch queries.
>
> I see the documentation for Flink v1.11 describe concepts for Split,
> SourceReader, and SplitEnumerator to enable streaming queries on
> unbounded splits. Is that the direction you were pointing to?
>
> Regards,
> Satyam
>
> On Thu, Jul 30, 2020 at 6:03 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Satyam,
>
> I think you can use the InputSplitAssigner also for streaming
> pipelines
> through an InputFormat. You can use
> StreamExecutionEnvironment#createInput or for SQL you can write your
> source according to the documentation here:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source
>
> If you do not want to use an InputFormat I think there is no easy
> way to
> do it now.
>
> Best,
>
> Dawid
>
> On 29/07/2020 13:53, Satyam Shekhar wrote:
> > Hello,
> >
> > I am using Flink v1.10 in a distributed environment to run SQL
> queries
> > on batch and streaming data.
> >
> > In my setup, data is sharded and distributed across the cluster.
> Each
> > shard receives streaming updates from some external source. I
> wish to
> > minimize data movement during query evaluation for performance
> > reasons. For that, I need some construct to advise Flink planner to
> > bind splits (shard) to the host where it is located. 
> >
> > I have come across InputSplitAssigner which gives me levers to
> > influence compute colocation for batch queries. Is there a way to do
> > the same for streaming queries as well? 
> >
> > Regards,
> > Satyam
>


signature.asc
Description: OpenPGP digital signature


EMR Saving CheckPoint into to S3 and 403 Access Denied

2020-07-31 Thread mars
Hi,

  I am running Flin k Jobs on EMR (5.30.1) and trying to save the checkpoint
info to S3.

  I have the following in flink-conf.xml file and when i try to submit the
jobs to flink cluster the JobManager is is failing as it is unable to save
the checkpoint info to S3.

  s3.access-key: <>
  s3.secret-key: <>
  s3.sse.enabled: true
  s3.server-side-encryption-algorithm: AES256

  And the Error i am getting is

2020-07-31 03:03:27,965 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   
- Configuring application-defined state backend with job/cluster config
2020-07-31 03:03:29,997 ERROR
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Failed to
submit job ae00efc463917546b02c3a3f3586046c.
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:152)
... 7 more
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
AccessDenied; Request ID: 515261D45A03B5C5; S3 Extended Request ID:
N1IpKJnJVIRA84QKXqKarjGL71+DYlwXbdXY99N02ykf6q1XEjbSzBoXFU8XfBJXIDrywlG3sTo=),
S3 Extended Request ID:
N1IpKJnJVIRA84QKXqKarjGL71+DYlwXbdXY99N02ykf6q1XEjbSzBoXFU8XfBJXIDrywlG3sTo=
at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1742)

Thanks
Sateesh
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer

2020-07-31 Thread Qingsheng Ren
Hi Vikash,

It's a bug about classloader used in `abortTransaction()` method in
`FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in
1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version
1.10.0?


Vikash Dat  于2020年7月30日周四 下午9:26写道:

> Has anyone had success with using exactly_once in a kafka producer in
> flink?
> As of right now I don't think the code shown in the docs
> (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer
> )
> actually works.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards,

*Qingsheng Ren*

Electrical and Computer Engineering
Carnegie Mellon University

Email: renqs...@gmail.com